import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
+import java.nio.ByteBuffer;
/**
* IoTTable data structure. Provides client interface.
final public class Table {
-
/* Constants */
- static final int FREE_SLOTS = 10; // Number of slots that should be kept free
+ static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
static final int SKIP_THRESHOLD = 10;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
static final int REJECTED_THRESHOLD = 5;
-
/* Helper Objects */
private SlotBuffer buffer = null;
private CloudComm cloud = null;
private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
private long localMachineId = 0; // Machine ID of this client device
private long sequenceNumber = 0; // Largest sequence number a client has received
- private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
- private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
+ private long localSequenceNumber = 0;
+
+ // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
+ // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
- private long localCommitSequenceNumber = 0;
+ private long localArbitrationSequenceNumber = 0;
+ private boolean hadPartialSendToServer = false;
+ private boolean attemptedToSendToServer = false;
+ private long expectedsize;
+ private boolean didFindTableStatus = false;
+ private long currMaxSize = 0;
+
+ private Slot lastSlotAttemptedToSend = null;
+ private boolean lastIsNewKey = false;
+ private int lastNewSize = 0;
+ private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
+ private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
+ private NewKey lastNewKey = null;
+
/* Data Structures */
private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
-
private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
private Map<IoTString, Commit> liveCommitsByKeyTable = null;
private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
-
private List<Transaction> pendingTransactionQueue = null;
- private List<Entry> pendingSendArbitrationEntries = null;
+ private List<ArbitrationRound> pendingSendArbitrationRounds = null;
private List<Entry> pendingSendArbitrationEntriesToDelete = null;
private Map<Transaction, List<Integer>> transactionPartsSent = null;
private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
+ private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
+ private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
+ private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
+ private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
+ private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
-
-
-
- public Table(String baseurl, String password, long _localMachineId) {
+ public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
localMachineId = _localMachineId;
- cloud = new CloudComm(this, baseurl, password);
+ cloud = new CloudComm(this, baseurl, password, listeningPort);
init();
}
lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
rejectedSlotList = new Vector<Long>();
pendingTransactionQueue = new ArrayList<Transaction>();
- pendingSendArbitrationEntries = new ArrayList<Entry>();
pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
transactionPartsSent = new HashMap<Transaction, List<Integer>>();
outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
+ liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
+ offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
+ localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
+ lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
+ pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
+
// Other init stuff
numberOfSlots = buffer.capacity();
setResizeThreshold();
}
+ // TODO: delete method
+ public synchronized void printSlots() {
+ long o = buffer.getOldestSeqNum();
+ long n = buffer.getNewestSeqNum();
+
+ int[] types = new int[10];
+
+ int num = 0;
+
+ int livec = 0;
+ int deadc = 0;
+
+ int casdasd = 0;
+
+ int liveslo = 0;
+
+ for (long i = o; i < (n + 1); i++) {
+ Slot s = buffer.getSlot(i);
+
+
+ if (s.isLive()) {
+ liveslo++;
+ }
+
+ Vector<Entry> entries = s.getEntries();
+
+ for (Entry e : entries) {
+ if (e.isLive()) {
+ int type = e.getType();
+
+
+ if (type == 6) {
+ RejectedMessage rej = (RejectedMessage)e;
+ casdasd++;
+
+ System.out.println(rej.getMachineID());
+ }
+
+
+ types[type] = types[type] + 1;
+ num++;
+ livec++;
+ } else {
+ deadc++;
+ }
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ System.out.println(i + " " + types[i]);
+ }
+ System.out.println("Live count: " + livec);
+ System.out.println("Live Slot count: " + liveslo);
+
+ System.out.println("Dead count: " + deadc);
+ System.out.println("Old: " + o);
+ System.out.println("New: " + n);
+ System.out.println("Size: " + buffer.size());
+ // System.out.println("Commits: " + liveCommitsTable.size());
+ System.out.println("pendingTrans: " + pendingTransactionQueue.size());
+ System.out.println("Trans Status Out: " + outstandingTransactionStatus.size());
+
+ for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
+ System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
+ }
+
+
+ for (Long a : liveCommitsTable.keySet()) {
+ for (Long b : liveCommitsTable.get(a).keySet()) {
+ for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
+ System.out.print(kv + " ");
+ }
+ System.out.print("|| ");
+ }
+ System.out.println();
+ }
+
+ }
+
/**
* Initialize the table by inserting a table status as the first entry into the table status
* also initialize the crypto stuff.
*/
public synchronized void initTable() throws ServerException {
- cloud.setSalt(); //Set the salt
+ cloud.initSecurity();
// Create the first insertion into the block chain which is the table status
- Slot s = new Slot(this, 1, localMachineId);
+ Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
+ localSequenceNumber++;
TableStatus status = new TableStatus(s, numberOfSlots);
s.addEntry(status);
Slot[] array = cloud.putSlot(s, numberOfSlots);
array = new Slot[] {s};
// update local block chain
validateAndUpdate(array, true);
+ } else if (array.length == 1) {
+ // in case we did push the slot BUT we failed to init it
+ validateAndUpdate(array, true);
} else {
throw new Error("Error on initialization");
}
// Just pull the latest slots from the server
Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newslots, true);
+ sendToServer(null);
+ updateLiveTransactionsAndStatus();
+
}
// public String toString() {
// return retString;
// }
+ public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
+ localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
+ }
+
public synchronized Long getArbitrator(IoTString key) {
return arbitratorTable.get(key);
}
+ public synchronized void close() {
+ cloud.close();
+ }
+
public synchronized IoTString getCommitted(IoTString key) {
KeyValue kv = committedKeyValueTable.get(key);
}
}
- public synchronized void update() {
+ public synchronized boolean update() {
try {
Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newSlots, false);
sendToServer(null);
+
+
+ updateLiveTransactionsAndStatus();
+
+ return true;
} catch (Exception e) {
- e.printStackTrace();
+ // e.printStackTrace();
+
+ for (Long m : localCommunicationTable.keySet()) {
+ updateFromLocal(m);
+ }
}
+
+ return false;
}
- public synchronized boolean createNewKey(IoTString keyName, long machineId) {
+ public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
while (true) {
if (arbitratorTable.get(keyName) != null) {
// There is already an arbitrator
}
NewKey newKey = new NewKey(null, keyName, machineId);
+
if (sendToServer(newKey)) {
// If successfully inserted
return true;
}
}
- public void startTransaction() {
+ public synchronized void startTransaction() {
// Create a new transaction, invalidates any old pending transactions.
pendingTransactionBuilder = new PendingTransaction(localMachineId);
}
pendingTransactionBuilder = new PendingTransaction(localMachineId);
- sendToServer(null);
+ try {
+ sendToServer(null);
+ } catch (ServerException e) {
+
+ Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
+ for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
+ Transaction transaction = iter.next();
+
+ if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
+ // Already contacted this client so ignore all attempts to contact this client
+ // to preserve ordering for arbitrator
+ continue;
+ }
+
+ Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
+
+ if (sendReturn.getFirst()) {
+ // Failed to contact over local
+ arbitratorTriedAndFailed.add(transaction.getArbitrator());
+ } else {
+ // Successful contact or should not contact
+
+ if (sendReturn.getSecond()) {
+ // did arbitrate
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ updateLiveStateFromLocal();
return transactionStatus;
}
bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
}
- private boolean sendToServer(NewKey newKey) {
+ public long getLocalSequenceNumber() {
+ return localSequenceNumber;
+ }
+
+
+ boolean lastInsertedNewKey = false;
+
+ private boolean sendToServer(NewKey newKey) throws ServerException {
+
+ boolean fromRetry = false;
+
+ try {
+ if (hadPartialSendToServer) {
+ Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ if (newSlots.length == 0) {
+ fromRetry = true;
+ ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+
+ if (sendSlotsReturn.getFirst()) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ }
+ }
+ } else {
+
+ newSlots = sendSlotsReturn.getThird();
+
+ boolean isInserted = false;
+ for (Slot s : newSlots) {
+ if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : newSlots) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ } else {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+
+ if (sendSlotsReturn.getThird().length != 0) {
+ // insert into the local block chain
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
+ }
+ // continue;
+ } else {
+ boolean isInserted = false;
+ for (Slot s : newSlots) {
+ if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : newSlots) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ } else {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+ } else {
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ }
+ } catch (ServerException e) {
+ throw e;
+ }
+
+
try {
// While we have stuff that needs inserting into the block chain
- while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationEntries.size() > 0) || (newKey != null)) {
+ while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
- // try {
- // Thread.sleep(300);
- // } catch (Exception e) {
+ fromRetry = false;
- // }
+ if (hadPartialSendToServer) {
+ throw new Error("Should Be error free");
+ }
+
+
+
+ // If there is a new key with same name then end
+ if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
+ return false;
+ }
// Create the slot
- Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
+ Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
+ localSequenceNumber++;
// Try to fill the slot with data
ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
fillSlot(slot, true, newKey);
}
- // Try to send to the server
- Pair<Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize);
+ lastSlotAttemptedToSend = slot;
+ lastIsNewKey = (newKey != null);
+ lastInsertedNewKey = insertedNewKey;
+ lastNewSize = newSize;
+ lastNewKey = newKey;
+ lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+ lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+
+
+ ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
if (sendSlotsReturn.getFirst()) {
+
// Did insert into the block chain
- // New Key was successfully inserted into the block chain so dont want to insert it again
- newKey = null;
+ if (insertedNewKey) {
+ // This slot was what was inserted not a previous slot
+
+ // New Key was successfully inserted into the block chain so dont want to insert it again
+ newKey = null;
+ }
// Remove the aborts and commit parts that were sent from the pending to send queue
- pendingSendArbitrationEntries.removeAll(pendingSendArbitrationEntriesToDelete);
+ for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
+ ArbitrationRound round = iter.next();
+ round.removeParts(pendingSendArbitrationEntriesToDelete);
+
+ if (round.isDoneSending()) {
+ // Sent all the parts
+ iter.remove();
+ }
+ }
for (Transaction transaction : transactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
// Update which transactions parts still need to be sent
transaction.removeSentParts(transactionPartsSent.get(transaction));
}
}
} else {
+
+ // if (!sendSlotsReturn.getSecond()) {
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+ // }
+ // } else {
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+
+ // // Update which transactions parts still need to be sent
+ // transaction.removeSentParts(transactionPartsSent.get(transaction));
+
+ // // Add the transaction status to the outstanding list
+ // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // // Update the transaction status
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ // if (transaction.didSendAllParts()) {
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ // pendingTransactionQueue.remove(transaction);
+
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
+ // }
+ // }
+ // }
+ // }
+
// Reset which transaction to send
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.resetNextPartToSend();
+ // transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
pendingSendArbitrationEntriesToDelete.clear();
transactionPartsSent.clear();
- if (sendSlotsReturn.getSecond().length != 0) {
+ if (sendSlotsReturn.getThird().length != 0) {
// insert into the local block chain
- validateAndUpdate(sendSlotsReturn.getSecond(), true);
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
}
}
+
} catch (ServerException e) {
if (e.getType() != ServerException.TypeInputTimeout) {
- e.printStackTrace();
+ // e.printStackTrace();
// Nothing was able to be sent to the server so just clear these data structures
for (Transaction transaction : transactionPartsSent.keySet()) {
+ transaction.resetNextPartToSend();
+
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
-
- pendingSendArbitrationEntriesToDelete.clear();
- transactionPartsSent.clear();
} else {
// There was a partial send to the server
+ hadPartialSendToServer = true;
+
+
+ // if (!fromRetry) {
+ // lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ // }
+
+ // Nothing was able to be sent to the server so just clear these data structures
+ for (Transaction transaction : transactionPartsSent.keySet()) {
+ transaction.resetNextPartToSend();
+ transaction.setServerFailure();
+ }
}
+
+ pendingSendArbitrationEntriesToDelete.clear();
+ transactionPartsSent.clear();
+
+ throw e;
}
return newKey == null;
}
- private Pair<Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize) throws ServerException {
+ private synchronized boolean updateFromLocal(long machineId) {
+ Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
+ if (localCommunicationInformation == null) {
+ // Cant talk to that device locally so do nothing
+ return false;
+ }
+
+ // Get the size of the send data
+ int sendDataSize = Integer.BYTES + Long.BYTES;
+
+ Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
+ }
+
+ byte[] sendData = new byte[sendDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+ // Encode the data
+ bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode.putInt(0);
+
+ // Send by local
+ byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ localSequenceNumber++;
+
+ if (returnData == null) {
+ // Could not contact server
+ return false;
+ }
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+ int numberOfEntries = bbDecode.getInt();
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ byte type = bbDecode.get();
+ if (type == Entry.TypeAbort) {
+ Abort abort = (Abort)Abort.decode(null, bbDecode);
+ processEntry(abort);
+ } else if (type == Entry.TypeCommitPart) {
+ CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ return true;
+ }
+
+ private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
+
+ // Get the devices local communications
+ Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
+
+ if (localCommunicationInformation == null) {
+ // Cant talk to that device locally so do nothing
+ return new Pair<Boolean, Boolean>(true, false);
+ }
+
+ // Get the size of the send data
+ int sendDataSize = Integer.BYTES + Long.BYTES;
+ for (TransactionPart part : transaction.getParts().values()) {
+ sendDataSize += part.getSize();
+ }
+
+ Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
+ }
+
+ // Make the send data size
+ byte[] sendData = new byte[sendDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+ // Encode the data
+ bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode.putInt(transaction.getParts().size());
+ for (TransactionPart part : transaction.getParts().values()) {
+ part.encode(bbEncode);
+ }
+
+
+ // Send by local
+ byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ localSequenceNumber++;
+
+ if (returnData == null) {
+ // Could not contact server
+ return new Pair<Boolean, Boolean>(true, false);
+ }
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+ boolean didCommit = bbDecode.get() == 1;
+ boolean couldArbitrate = bbDecode.get() == 1;
+ int numberOfEntries = bbDecode.getInt();
+ boolean foundAbort = false;
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ byte type = bbDecode.get();
+ if (type == Entry.TypeAbort) {
+ Abort abort = (Abort)Abort.decode(null, bbDecode);
+
+ if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
+ foundAbort = true;
+ }
+
+ processEntry(abort);
+ } else if (type == Entry.TypeCommitPart) {
+ CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ if (couldArbitrate) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (didCommit) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ } else {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ } else {
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (foundAbort) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ } else {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ }
+ }
+
+ return new Pair<Boolean, Boolean>(false, true);
+ }
+
+ public synchronized byte[] acceptDataFromLocal(byte[] data) {
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
+ int numberOfParts = bbDecode.getInt();
+
+ // If we did commit a transaction or not
+ boolean didCommit = false;
+ boolean couldArbitrate = false;
+
+ if (numberOfParts != 0) {
+
+ // decode the transaction
+ Transaction transaction = new Transaction();
+ for (int i = 0; i < numberOfParts; i++) {
+ bbDecode.get();
+ TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
+ transaction.addPartDecode(newPart);
+ }
+
+ // Arbitrate on transaction and pull relevant return data
+ Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ couldArbitrate = localArbitrateReturn.getFirst();
+ didCommit = localArbitrateReturn.getSecond();
+
+ updateLiveStateFromLocal();
+
+ // Transaction was sent to the server so keep track of it to prevent double commit
+ if (transaction.getSequenceNumber() != -1) {
+ offlineTransactionsCommittedAndAtServer.add(transaction.getId());
+ }
+ }
+
+ // The data to send back
+ int returnDataSize = 0;
+ List<Entry> unseenArbitrations = new ArrayList<Entry>();
+
+ // Get the aborts to send back
+ List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
+ Collections.sort(abortLocalSequenceNumbers);
+ for (Long localSequenceNumber : abortLocalSequenceNumbers) {
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
+ unseenArbitrations.add(abort);
+ returnDataSize += abort.getSize();
+ }
+
+ // Get the commits to send back
+ Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
+ if (commitForClientTable != null) {
+ List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Collections.sort(commitLocalSequenceNumbers);
+
+ for (Long localSequenceNumber : commitLocalSequenceNumbers) {
+ Commit commit = commitForClientTable.get(localSequenceNumber);
+
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ unseenArbitrations.addAll(commit.getParts().values());
+
+ for (CommitPart commitPart : commit.getParts().values()) {
+ returnDataSize += commitPart.getSize();
+ }
+ }
+ }
+
+ // Number of arbitration entries to decode
+ returnDataSize += 2 * Integer.BYTES;
+
+ // Boolean of did commit or not
+ if (numberOfParts != 0) {
+ returnDataSize += Byte.BYTES;
+ }
+
+ // Data to send Back
+ byte[] returnData = new byte[returnDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
+
+ if (numberOfParts != 0) {
+ if (didCommit) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ if (couldArbitrate) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ }
+
+ bbEncode.putInt(unseenArbitrations.size());
+ for (Entry entry : unseenArbitrations) {
+ entry.encode(bbEncode);
+ }
+
+
+ localSequenceNumber++;
+ return returnData;
+ }
+
+ private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey) throws ServerException {
- boolean inserted = true;
+ boolean attemptedToSendToServerTmp = attemptedToSendToServer;
+ attemptedToSendToServer = true;
+
+ boolean inserted = false;
+ boolean lastTryInserted = false;
Slot[] array = cloud.putSlot(slot, newSize);
if (array == null) {
array = new Slot[] {slot};
rejectedSlotList.clear();
+ inserted = true;
} else {
if (array.length == 0) {
throw new Error("Server Error: Did not send any slots");
}
- rejectedSlotList.add(slot.getSequenceNumber());
- inserted = false;
+
+ // if (attemptedToSendToServerTmp) {
+ if (hadPartialSendToServer) {
+
+ boolean isInserted = false;
+ for (Slot s : array) {
+ if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : array) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!isInserted) {
+ rejectedSlotList.add(slot.getSequenceNumber());
+ lastTryInserted = false;
+ } else {
+ lastTryInserted = true;
+ }
+ } else {
+ rejectedSlotList.add(slot.getSequenceNumber());
+ lastTryInserted = false;
+ }
}
- return new Pair<Boolean, Slot[]>(inserted, array);
+ return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
}
/**
* Returns false if a resize was needed
*/
private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
+
+
int newSize = 0;
if (liveSlotCount > bufferResizeThreshold) {
resize = true; //Resize is forced
+
}
if (resize) {
if (newKeyEntry != null) {
newKeyEntry.setSlot(slot);
if (slot.hasSpace(newKeyEntry)) {
+
slot.addEntry(newKeyEntry);
inserted = true;
}
transactionPartsSent.clear();
pendingSendArbitrationEntriesToDelete.clear();
- // Insert pending arbitration data
- for (Entry arbitrationData : pendingSendArbitrationEntries) {
+ for (ArbitrationRound round : pendingSendArbitrationRounds) {
+ boolean isFull = false;
+ round.generateParts();
+ List<Entry> parts = round.getParts();
+
+ // Insert pending arbitration data
+ for (Entry arbitrationData : parts) {
+
+ // If it is an abort then we need to set some information
+ if (arbitrationData instanceof Abort) {
+ ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
+ }
+
+ if (!slot.hasSpace(arbitrationData)) {
+ // No space so cant do anything else with these data entries
+ isFull = true;
+ break;
+ }
- // If it is an abort then we need to set some information
- if (arbitrationData instanceof Abort) {
- ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
+ // Add to this current slot and add it to entries to delete
+ slot.addEntry(arbitrationData);
+ pendingSendArbitrationEntriesToDelete.add(arbitrationData);
}
- if (!slot.hasSpace(arbitrationData)) {
- // No space so cant do anything else with these data entries
+ if (isFull) {
break;
}
-
- // Add to this current slot and add it to entries to delete
- slot.addEntry(arbitrationData);
- pendingSendArbitrationEntriesToDelete.add(arbitrationData);
}
- // Insert as many transactions as possible while keeping order
- for (Transaction transaction : pendingTransactionQueue) {
+ if (pendingTransactionQueue.size() > 0) {
+
+ Transaction transaction = pendingTransactionQueue.get(0);
// Set the transaction sequence number if it has yet to be inserted into the block chain
- if (!transaction.didSendAPartToServer()) {
+ // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // }
+
+ if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
transaction.setSequenceNumber(slot.getSequenceNumber());
}
- boolean ranOutOfSpace = false;
while (true) {
TransactionPart part = transaction.getNextPartToSend();
if (slot.hasSpace(part)) {
slot.addEntry(part);
-
List<Integer> partsSent = transactionPartsSent.get(transaction);
if (partsSent == null) {
partsSent = new ArrayList<Integer>();
transactionPartsSent.put(transaction, partsSent);
}
-
partsSent.add(part.getPartNumber());
transactionPartsSent.put(transaction, partsSent);
-
} else {
- ranOutOfSpace = true;
break;
}
}
-
- if (ranOutOfSpace) {
- break;
- }
}
// Fill the remainder of the slot with rescue data
long old_seqn = rejectedSlotList.firstElement();
if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
long new_seqn = rejectedSlotList.lastElement();
- RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
s.addEntry(rm);
} else {
long prev_seqn = -1;
}
/* Generate rejected message entry for missing messages */
if (prev_seqn != -1) {
- RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
s.addEntry(rm);
}
/* Generate rejected message entries for present messages */
long curr_seqn = rejectedSlotList.get(i);
Slot s_msg = buffer.getSlot(curr_seqn);
long machineid = s_msg.getMachineID();
- RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
s.addEntry(rm);
}
}
return;
}
- // Reset the table status declared sizes
- smallestTableStatusSeen = -1;
- largestTableStatusSeen = -1;
-
-
// Make sure all slots are newer than the last largest slot this client has seen
long firstSeqNum = newSlots[0].getSequenceNumber();
if (firstSeqNum <= sequenceNumber) {
// Process each slots data
for (Slot slot : newSlots) {
processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+
+ updateExpectedSize();
}
// If there is a gap, check to see if the server sent us everything.
sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
updateLiveStateFromServer();
+
+ // No Need to remember after we pulled from the server
+ offlineTransactionsCommittedAndAtServer.clear();
+
+ // This is invalidated now
+ hadPartialSendToServer = false;
}
private void updateLiveStateFromServer() {
updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
}
+ private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+ // if (didFindTableStatus) {
+ // return;
+ // }
+ long prevslots = firstSequenceNumber;
+
+
+ if (didFindTableStatus) {
+ // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
+ // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
+
+ } else {
+ expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+ // System.out.println("Here: " + expectedsize);
+ }
+
+ // System.out.println(numberOfSlots);
+
+ didFindTableStatus = true;
+ currMaxSize = numberOfSlots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+
+ if (expectedsize > currMaxSize) {
+ expectedsize = currMaxSize;
+ }
+ }
+
+
/**
* Check the size of the block chain to make sure there are enough slots sent back by the server.
* This is only called when we have a gap between the slots that we have locally and the slots
* status message
*/
private void checkNumSlots(int numberOfSlots) {
-
- // We only have 1 size so we must have this many slots
- if (largestTableStatusSeen == smallestTableStatusSeen) {
- if (numberOfSlots != smallestTableStatusSeen) {
- throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
- }
- } else {
- // We have more than 1
- if (numberOfSlots < smallestTableStatusSeen) {
- throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
- }
+ if (numberOfSlots != expectedsize) {
+ throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots);
}
}
+ private void updateCurrMaxSize(int newmaxsize) {
+ currMaxSize = newmaxsize;
+ }
+
+
/**
* Update the size of of the local buffer if it is needed.
*/
private void commitNewMaxSize() {
-
- int currMaxSize = 0;
-
- if (largestTableStatusSeen == -1) {
- // No table status seen so the current max size does not change
- currMaxSize = numberOfSlots;
- } else {
- currMaxSize = largestTableStatusSeen;
- }
+ didFindTableStatus = false;
// Resize the local slot buffer
if (numberOfSlots != currMaxSize) {
- buffer.resize(currMaxSize);
+ buffer.resize((int)currMaxSize);
}
// Change the number of local slots to the new size
- numberOfSlots = currMaxSize;
+ numberOfSlots = (int)currMaxSize;
+
// Recalculate the resize threshold since the size of the local buffer has changed
setResizeThreshold();
newTransactionParts.clear();
}
- public void arbitrateFromServer() {
+
+ private long lastSeqNumArbOn = 0;
+
+ private void arbitrateFromServer() {
if (liveTransactionBySequenceNumberTable.size() == 0) {
// Nothing to arbitrate on so move on
// The last transaction arbitrated on
long lastTransactionCommitted = -1;
+ Set<Abort> generatedAborts = new HashSet<Abort>();
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
+
+
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
continue;
}
+ if (transactionSequenceNumber < lastSeqNumArbOn) {
+ continue;
+ }
+
+ if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
+ // We have seen this already locally so dont commit again
+ continue;
+ }
+
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
}
+ // update the largest transaction seen by arbitrator from server
+ if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
+ lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
+ } else {
+ Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
+ if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
+ lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
+ }
+ }
+
if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
// Guard evaluated as true
}
// Update what the last transaction committed was for use in batch commit
- lastTransactionCommitted = transaction.getSequenceNumber();
-
+ lastTransactionCommitted = transactionSequenceNumber;
} else {
// Guard evaluated was false so create abort
transaction.getClientLocalSequenceNumber(),
transaction.getSequenceNumber(),
transaction.getMachineId(),
- transaction.getArbitrator());
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
- // Add the abort to the queue of aborts to send out
- pendingSendArbitrationEntries.add(newAbort);
+ generatedAborts.add(newAbort);
// Insert the abort so we can process
processEntry(newAbort);
}
+
+ lastSeqNumArbOn = transactionSequenceNumber;
+
+ // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
}
+ Commit newCommit = null;
+
// If there is something to commit
if (speculativeTableTmp.size() != 0) {
// Create the commit and increment the commit sequence number
- Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, lastTransactionCommitted);
- localCommitSequenceNumber++;
+ newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
+ localArbitrationSequenceNumber++;
// Add all the new keys to the commit
for (KeyValue kv : speculativeTableTmp.values()) {
newCommit.createCommitParts();
// Append all the commit parts to the end of the pending queue waiting for sending to the server
- pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
// Insert the commit so we can process it
for (CommitPart commitPart : newCommit.getParts().values()) {
processEntry(commitPart);
}
}
+
+ if ((newCommit != null) || (generatedAborts.size() > 0)) {
+ ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
+ pendingSendArbitrationRounds.add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ if (newArbitrationRound.getCommit() != null) {
+ for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+ processEntry(commitPart);
+ }
+ }
+ }
+ }
}
- public void arbitrateOnLocalTransaction(Transaction transaction) {
+ private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
- return;
+ return new Pair<Boolean, Boolean>(false, false);
}
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
// Most likely this
- return;
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+
+ if (transaction.getMachineId() != localMachineId) {
+ // dont do this check for local transactions
+ if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
+ if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
+ // We've have already seen this from the server
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+ }
}
if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
// Guard evaluated as true
// Create the commit and increment the commit sequence number
- Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, -1);
- localCommitSequenceNumber++;
+ Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
+ localArbitrationSequenceNumber++;
// Update the local changes so we can make the commit
for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
newCommit.createCommitParts();
// Append all the commit parts to the end of the pending queue waiting for sending to the server
- pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
+ ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
+ pendingSendArbitrationRounds.add(arbitrationRound);
- // Insert the commit so we can process it
- for (CommitPart commitPart : newCommit.getParts().values()) {
- processEntry(commitPart);
+ if (compactArbitrationData()) {
+ ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+ processEntry(commitPart);
+ }
+ } else {
+ // Insert the commit so we can process it
+ for (CommitPart commitPart : newCommit.getParts().values()) {
+ processEntry(commitPart);
+ }
}
- TransactionStatus status = transaction.getTransactionStatus();
- status.setStatus(TransactionStatus.StatusCommitted);
+ if (transaction.getMachineId() == localMachineId) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ }
+ }
+ updateLiveStateFromLocal();
+ return new Pair<Boolean, Boolean>(true, true);
} else {
- // Guard evaluated was false so create abort
- TransactionStatus status = transaction.getTransactionStatus();
- status.setStatus(TransactionStatus.StatusAborted);
+
+ if (transaction.getMachineId() == localMachineId) {
+ // For locally created messages update the status
+
+ // Guard evaluated was false so create abort
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ } else {
+ Set addAbortSet = new HashSet<Abort>();
+
+
+ // Create the abort
+ Abort newAbort = new Abort(null,
+ transaction.getClientLocalSequenceNumber(),
+ -1,
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+
+ addAbortSet.add(newAbort);
+
+
+ // Append all the commit parts to the end of the pending queue waiting for sending to the server
+ ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
+ pendingSendArbitrationRounds.add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+ processEntry(commitPart);
+ }
+ }
+ }
+
+ updateLiveStateFromLocal();
+ return new Pair<Boolean, Boolean>(true, false);
}
}
+ /**
+ * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
+ */
+ private boolean compactArbitrationData() {
+
+ if (pendingSendArbitrationRounds.size() < 2) {
+ // Nothing to compact so do nothing
+ return false;
+ }
+
+ ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ if (lastRound.didSendPart()) {
+ return false;
+ }
+
+ boolean hadCommit = (lastRound.getCommit() == null);
+ boolean gotNewCommit = false;
+
+ int numberToDelete = 1;
+ while (numberToDelete < pendingSendArbitrationRounds.size()) {
+ ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
+
+ if (round.isFull() || round.didSendPart()) {
+ // Stop since there is a part that cannot be compacted and we need to compact in order
+ break;
+ }
+
+ if (round.getCommit() == null) {
+
+ // Try compacting aborts only
+ int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
+ if (newSize > ArbitrationRound.MAX_PARTS) {
+ // Cant compact since it would be too large
+ break;
+ }
+ lastRound.addAborts(round.getAborts());
+ } else {
+
+ // Create a new larger commit
+ Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+
+ // Create the commit parts so that we can count them
+ newCommit.createCommitParts();
+
+ // Calculate the new size of the parts
+ int newSize = newCommit.getNumberOfParts();
+ newSize += lastRound.getAbortsCount();
+ newSize += round.getAbortsCount();
+
+ if (newSize > ArbitrationRound.MAX_PARTS) {
+ // Cant compact since it would be too large
+ break;
+ }
+
+ // Set the new compacted part
+ lastRound.setCommit(newCommit);
+ lastRound.addAborts(round.getAborts());
+ gotNewCommit = true;
+ }
+
+ numberToDelete++;
+ }
+
+ if (numberToDelete != 1) {
+ // If there is a compaction
+
+ // Delete the previous pieces that are now in the new compacted piece
+ if (numberToDelete == pendingSendArbitrationRounds.size()) {
+ pendingSendArbitrationRounds.clear();
+ } else {
+ for (int i = 0; i < numberToDelete; i++) {
+ pendingSendArbitrationRounds.removeIndex(pendingSendArbitrationRounds.size() - 1);
+ }
+ }
+
+ // Add the new compacted into the pending to send list
+ pendingSendArbitrationRounds.add(lastRound);
+
+ // Should reinsert into the commit processor
+ if (hadCommit && gotNewCommit) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ // private boolean compactArbitrationData() {
+ // return false;
+ // }
+
/**
* Update all the commits and the committed tables, sets dead the dead transactions
*/
List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
Collections.sort(commitSequenceNumbers);
+ // Get the last commit seen from this arbitrator
+ long lastCommitSeenSequenceNumber = -1;
+ if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
+ lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
+ }
+
// Go through each new commit one by one
for (int i = 0; i < commitSequenceNumbers.size(); i++) {
Long commitSequenceNumber = commitSequenceNumbers.get(i);
}
}
- // Get the last commit seen from this arbitrator
- long lastCommitSeenSequenceNumber = -1;
- if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
- lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId());
- }
-
-
+ // Update the last transaction that was updated if we can
+ if (commit.getTransactionSequenceNumber() != -1) {
+ Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
+ // Update the last transaction sequence number that the arbitrator arbitrated on
+ if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
+ lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
+ }
+ }
+ // Update the last arbitration data that we have seen so far
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
+ long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
+ if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
+ // Is larger
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
+ }
+ } else {
+ // Never seen any data from this arbitrator so record the first one
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
+ }
// We have already seen this commit before so need to do the full processing on this commit
if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
continue;
}
-
// If we got here then this is a brand new commit and needs full processing
// Get what commits should be edited, these are the commits that have live values for their keys
}
// Update the last seen sequence number from this arbitrator
- lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
-
- // Update the last transaction that was updated if we can
- if (commit.getTransactionSequenceNumber() != -1) {
- Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
-
- // Update the last transaction sequence number that the arbitrator arbitrated on
- if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
- lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
+ if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
+ if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
+ lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
}
+ } else {
+ lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
}
// We processed a new commit that we havent seen before
}
}
-
/**
* Process this slot, entry by entry. Also update the latest message sent by slot
*/
break;
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry);
+ processEntry((TableStatus)entry, slot.getSequenceNumber());
break;
default:
* keeps track of the largest and smallest table status seen in this current round
* of updating the local copy of the block chain
*/
- private void processEntry(TableStatus entry) {
+ private void processEntry(TableStatus entry, long seq) {
int newNumSlots = entry.getMaxSlots();
+ updateCurrMaxSize(newNumSlots);
+
+ initExpectedSize(seq, newNumSlots);
if (liveTableStatus != null) {
// We have a larger table status so the old table status is no longer alive
// Make this new table status the latest alive table status
liveTableStatus = entry;
-
- if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
- smallestTableStatusSeen = newNumSlots;
- }
-
- if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
- largestTableStatusSeen = newNumSlots;
- }
}
/**
long newSeqNum = entry.getNewSeqNum();
boolean isequal = entry.getEqual();
long machineId = entry.getMachineID();
+ long seq = entry.getSequenceNumber();
// Check if we have messages that were supposed to be rejected in our local block chain
Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
long entrySequenceNumber = lastMessageValue.getFirst();
- if (entrySequenceNumber < newSeqNum) {
+ if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this machine ID did not see yet
addWatchList(lastMessageEntryMachineId, entry);
*/
private void processEntry(Abort entry) {
+
+ if (entry.getTransactionSequenceNumber() != -1) {
+ // update the transaction status if it was sent to the server
+ TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ }
+
// Abort has not been seen by the client it is for yet so we need to keep track of it
Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
if (previouslySeenAbort != null) {
previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
}
+ if (entry.getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
+ }
+
if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
// The machine already saw this so it is dead
entry.setDead();
- liveAbortTable.remove(entry);
+ liveAbortTable.remove(entry.getAbortId());
+
+ if (entry.getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
+ }
+
return;
}
- // update the transaction status
- TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
- if (status != null) {
- status.setStatus(TransactionStatus.StatusAborted);
+
+
+ // Update the last arbitration data that we have seen so far
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
+
+ long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
+ if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
+ // Is larger
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
+ }
+ } else {
+ // Never seen any data from this arbitrator so record the first one
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
}
* Process new commit entries and save them for future use. Delete duplicates
*/
private void processEntry(CommitPart entry) {
+
+
+ // Update the last transaction that was updated if we can
+ if (entry.getTransactionSequenceNumber() != -1) {
+ Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
+
+ // Update the last transaction sequence number that the arbitrator arbitrated on
+ if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
+ lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
+ }
+ }
+
+
+
+
Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
if (commitPart == null) {
- // Dont have a table for this machine Id yet so make one
+ // Don't have a table for this machine Id yet so make one
commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
newCommitParts.put(entry.getMachineId(), commitPart);
}
RejectedMessage rm = rmit.next();
// If this machine Id has seen this rejected message...
- if (rm.getNewSeqNum() <= seqNum) {
+ if (rm.getSequenceNumber() <= seqNum) {
// Remove it from our watchlist
rmit.remove();
if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
abort.setDead();
i.remove();
+
+ if (abort.getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
+ }
}
}
// Make sure the server is not playing any games
if (machineId == localMachineId) {
- // We were not making any updates and we had a machine mismatch
- if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
- throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqNum + " got: " + lastMessageSeqNum);
+ if (hadPartialSendToServer) {
+ // We were not making any updates and we had a machine mismatch
+ if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
+ }
+
+ } else {
+ // We were not making any updates and we had a machine mismatch
+ if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
+ }
}
} else {
if (lastMessageSeqNum > seqNum) {
throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
}
}
-}
\ No newline at end of file
+}