*/
final public class Table {
-
+
/* Constants */
- static final int FREE_SLOTS = 10; // Number of slots that should be kept free
+ static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
static final int SKIP_THRESHOLD = 10;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
private int numberOfSlots = 0; // Number of slots stored in buffer
private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
private long liveSlotCount = 0; // Number of currently live slots
- private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
+ private long oldestLiveSlotSequenceNumver = 1; // Smallest sequence number of the slot with a live entry
private long localMachineId = 0; // Machine ID of this client device
private long sequenceNumber = 0; // Largest sequence number a client has received
+ private long localSequenceNumber = 0;
+
// private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
- private long localArbitrationSequenceNumber = 0;
+ private long localTransactionSequenceNumber = 1; // Local sequence number counter for transactions
+ private long lastTransactionSequenceNumberSpeculatedOn = 0; // the last transaction that was speculated on
+ private long oldestTransactionSequenceNumberSpeculatedOn = 0; // the oldest transaction that was speculated on
+ private long localArbitrationSequenceNumber = 1;
private boolean hadPartialSendToServer = false;
private boolean attemptedToSendToServer = false;
private long expectedsize;
private boolean didFindTableStatus = false;
private long currMaxSize = 0;
+ private Slot lastSlotAttemptedToSend = null;
+ private boolean lastIsNewKey = false;
+ private int lastNewSize = 0;
+ private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
+ private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
+ private NewKey lastNewKey = null;
+
+
/* Data Structures */
private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
int livec = 0;
int deadc = 0;
+
+ int casdasd = 0;
+
+ int liveslo = 0;
+
for (long i = o; i < (n + 1); i++) {
Slot s = buffer.getSlot(i);
+
+ if (s.isLive()) {
+ liveslo++;
+ }
+
Vector<Entry> entries = s.getEntries();
for (Entry e : entries) {
if (e.isLive()) {
int type = e.getType();
+
+
+ if (type == 6) {
+ RejectedMessage rej = (RejectedMessage)e;
+ casdasd++;
+
+ System.out.println(rej.getMachineID());
+ }
+
+
types[type] = types[type] + 1;
num++;
livec++;
System.out.println(i + " " + types[i]);
}
System.out.println("Live count: " + livec);
+ System.out.println("Live Slot count: " + liveslo);
+
System.out.println("Dead count: " + deadc);
System.out.println("Old: " + o);
System.out.println("New: " + n);
System.out.println("Size: " + buffer.size());
- System.out.println("Commits: " + liveCommitsTable.size());
+ // System.out.println("Commits: " + liveCommitsTable.size());
+ System.out.println("pendingTrans: " + pendingTransactionQueue.size());
+ System.out.println("Trans Status Out: " + outstandingTransactionStatus.size());
+
+ for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
+ System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
+ }
+
for (Long a : liveCommitsTable.keySet()) {
for (Long b : liveCommitsTable.get(a).keySet()) {
cloud.initSecurity();
// Create the first insertion into the block chain which is the table status
- Slot s = new Slot(this, 1, localMachineId);
+ Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
+ localSequenceNumber++;
TableStatus status = new TableStatus(s, numberOfSlots);
s.addEntry(status);
Slot[] array = cloud.putSlot(s, numberOfSlots);
// Just pull the latest slots from the server
Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newslots, true);
+ sendToServer(null);
+ updateLiveTransactionsAndStatus();
+
}
-// public String toString() {
-// String retString = " Committed Table: \n";
-// retString += "---------------------------\n";
-// retString += commitedTable.toString();
+ // public String toString() {
+ // String retString = " Committed Table: \n";
+ // retString += "---------------------------\n";
+ // retString += commitedTable.toString();
-// retString += "\n\n";
+ // retString += "\n\n";
-// retString += " Speculative Table: \n";
-// retString += "---------------------------\n";
-// retString += speculativeTable.toString();
+ // retString += " Speculative Table: \n";
+ // retString += "---------------------------\n";
+ // retString += speculativeTable.toString();
-// return retString;
-// }
+ // return retString;
+ // }
public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
public synchronized void close() {
cloud.close();
}
+
+ // Return all keys in the table
+ public synchronized Set<IoTString> getKeys() {
+ return committedKeyValueTable.keySet();
+ }
public synchronized IoTString getCommitted(IoTString key) {
KeyValue kv = committedKeyValueTable.get(key);
validateAndUpdate(newSlots, false);
sendToServer(null);
+
+ updateLiveTransactionsAndStatus();
+
return true;
} catch (Exception e) {
// e.printStackTrace();
+
+ for (Long m : localCommunicationTable.keySet()) {
+ updateFromLocal(m);
+ }
}
return false;
}
NewKey newKey = new NewKey(null, keyName, machineId);
+
if (sendToServer(newKey)) {
// If successfully inserted
return true;
bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
}
+ public long getLocalSequenceNumber() {
+ return localSequenceNumber;
+ }
+
+
+ boolean lastInsertedNewKey = false;
+
private boolean sendToServer(NewKey newKey) throws ServerException {
+ boolean fromRetry = false;
+
+ try {
+ if (hadPartialSendToServer) {
+ Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ if (newSlots.length == 0) {
+ fromRetry = true;
+ ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+
+ if (sendSlotsReturn.getFirst()) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ }
+ }
+ } else {
+
+ newSlots = sendSlotsReturn.getThird();
+
+ boolean isInserted = false;
+ for (Slot s : newSlots) {
+ if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : newSlots) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ } else {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+
+ if (sendSlotsReturn.getThird().length != 0) {
+ // insert into the local block chain
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
+ }
+ // continue;
+ } else {
+ boolean isInserted = false;
+ for (Slot s : newSlots) {
+ if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : newSlots) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != null) {
+ if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+ newKey = null;
+ }
+ }
+
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // Update the transaction status
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction.didSendAllParts()) {
+ transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ pendingTransactionQueue.remove(transaction);
+ } else {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+ } else {
+ for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction.didSendAPartToServer()) {
+ transaction.setSequenceNumber(-1);
+ }
+ }
+ }
+
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ }
+ } catch (ServerException e) {
+ throw e;
+ }
+
+
+
try {
// While we have stuff that needs inserting into the block chain
while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
+ fromRetry = false;
+
+ if (hadPartialSendToServer) {
+ throw new Error("Should Be error free");
+ }
+
+
+
+ // If there is a new key with same name then end
+ if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
+ return false;
+ }
+
// Create the slot
- Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
+ Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
+ localSequenceNumber++;
// Try to fill the slot with data
ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
fillSlot(slot, true, newKey);
}
- // Try to send to the server
+ lastSlotAttemptedToSend = slot;
+ lastIsNewKey = (newKey != null);
+ lastInsertedNewKey = insertedNewKey;
+ lastNewSize = newSize;
+ lastNewKey = newKey;
+ lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+ lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+
+
ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
- if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
+ if (sendSlotsReturn.getFirst()) {
+
// Did insert into the block chain
- if (sendSlotsReturn.getFirst()) {
+ if (insertedNewKey) {
// This slot was what was inserted not a previous slot
// New Key was successfully inserted into the block chain so dont want to insert it again
}
}
} else {
+
+ // if (!sendSlotsReturn.getSecond()) {
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+ // }
+ // } else {
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+
+ // // Update which transactions parts still need to be sent
+ // transaction.removeSentParts(transactionPartsSent.get(transaction));
+
+ // // Add the transaction status to the outstanding list
+ // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+ // // Update the transaction status
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+ // // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ // if (transaction.didSendAllParts()) {
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ // pendingTransactionQueue.remove(transaction);
+
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
+ // }
+ // }
+ // }
+ // }
+
// Reset which transaction to send
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.resetNextPartToSend();
- transaction.resetNextPartToSend();
+ // transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
validateAndUpdate(sendSlotsReturn.getThird(), true);
}
}
- } catch (ServerException e) {
- // System.out.println("Server Failure: " + e.getType());
+ } catch (ServerException e) {
if (e.getType() != ServerException.TypeInputTimeout) {
// e.printStackTrace();
transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
+ if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
transaction.setSequenceNumber(-1);
}
}
// There was a partial send to the server
hadPartialSendToServer = true;
+
+ // if (!fromRetry) {
+ // lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+ // }
+
// Nothing was able to be sent to the server so just clear these data structures
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.resetNextPartToSend();
return newKey == null;
}
- public synchronized boolean updateFromLocal(long machineId) {
+ private synchronized boolean updateFromLocal(long machineId) {
Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
if (localCommunicationInformation == null) {
// Cant talk to that device locally so do nothing
bbEncode.putInt(0);
// Send by local
- byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ localSequenceNumber++;
if (returnData == null) {
// Could not contact server
if (localCommunicationInformation == null) {
// Cant talk to that device locally so do nothing
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<Boolean, Boolean>(true, false);
}
// Get the size of the send data
// Send by local
- byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+ localSequenceNumber++;
if (returnData == null) {
// Could not contact server
entry.encode(bbEncode);
}
+
+ localSequenceNumber++;
return returnData;
}
* Returns false if a resize was needed
*/
private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
+
+
int newSize = 0;
if (liveSlotCount > bufferResizeThreshold) {
resize = true; //Resize is forced
+
}
if (resize) {
if (newKeyEntry != null) {
newKeyEntry.setSlot(slot);
if (slot.hasSpace(newKeyEntry)) {
+
slot.addEntry(newKeyEntry);
inserted = true;
}
Transaction transaction = pendingTransactionQueue.get(0);
// Set the transaction sequence number if it has yet to be inserted into the block chain
- if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+ // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // }
+
+ if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
transaction.setSequenceNumber(slot.getSequenceNumber());
}
+
while (true) {
TransactionPart part = transaction.getNextPartToSend();
// Process each slots data
for (Slot slot : newSlots) {
processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+
updateExpectedSize();
}
updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
}
-
-
private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+ // if (didFindTableStatus) {
+ // return;
+ // }
+ long prevslots = firstSequenceNumber;
+
+
if (didFindTableStatus) {
- return;
+ // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
+ // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
+
+ } else {
+ expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+ // System.out.println("Here: " + expectedsize);
}
- long prevslots = firstSequenceNumber;
- expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+
+ // System.out.println(numberOfSlots);
+
+ didFindTableStatus = true;
currMaxSize = numberOfSlots;
}
private void updateExpectedSize() {
expectedsize++;
- if (expectedsize > currMaxSize)
- {
+
+ if (expectedsize > currMaxSize) {
expectedsize = currMaxSize;
}
}
}
private void updateCurrMaxSize(int newmaxsize) {
- currMaxSize=newmaxsize;
+ currMaxSize = newmaxsize;
}
* Update the size of of the local buffer if it is needed.
*/
private void commitNewMaxSize() {
- didFindTableStatus = false;
+ didFindTableStatus = false;
// Resize the local slot buffer
if (numberOfSlots != currMaxSize) {
// Change the number of local slots to the new size
numberOfSlots = (int)currMaxSize;
+
// Recalculate the resize threshold since the size of the local buffer has changed
setResizeThreshold();
}
newTransactionParts.clear();
}
+
+ private long lastSeqNumArbOn = 0;
+
private void arbitrateFromServer() {
if (liveTransactionBySequenceNumberTable.size() == 0) {
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
+
+
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
continue;
}
+ if (transactionSequenceNumber < lastSeqNumArbOn) {
+ continue;
+ }
+
if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
// We have seen this already locally so dont commit again
continue;
break;
}
+
// update the largest transaction seen by arbitrator from server
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
}
// Update what the last transaction committed was for use in batch commit
- lastTransactionCommitted = transaction.getSequenceNumber();
+ lastTransactionCommitted = transactionSequenceNumber;
} else {
// Guard evaluated was false so create abort
// Insert the abort so we can process
processEntry(newAbort);
}
+
+ lastSeqNumArbOn = transactionSequenceNumber;
+
+ // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
}
Commit newCommit = null;
}
}
+ // Update the last transaction that was updated if we can
+ if (commit.getTransactionSequenceNumber() != -1) {
+ Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
+
+ // Update the last transaction sequence number that the arbitrator arbitrated on
+ if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
+ lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
+ }
+ }
+
// Update the last arbitration data that we have seen so far
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
}
- // Update the last transaction that was updated if we can
- if (commit.getTransactionSequenceNumber() != -1) {
- Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
-
- // Update the last transaction sequence number that the arbitrator arbitrated on
- if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
- lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
- }
- }
-
// We processed a new commit that we havent seen before
didProcessANewCommit = true;
* Process new commit entries and save them for future use. Delete duplicates
*/
private void processEntry(CommitPart entry) {
+
+
+ // Update the last transaction that was updated if we can
+ if (entry.getTransactionSequenceNumber() != -1) {
+ Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
+
+ // Update the last transaction sequence number that the arbitrator arbitrated on
+ if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
+ lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
+ }
+ }
+
+
+
+
Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
if (commitPart == null) {
throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
}
}
-}
\ No newline at end of file
+}