3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
16 * IoTTable data structure. Provides client interface.
17 * @author Brian Demsky
21 final public class Table {
25 static final int FREE_SLOTS = 10; // Number of slots that should be kept free
26 static final int SKIP_THRESHOLD = 10;
27 static final double RESIZE_MULTIPLE = 1.2;
28 static final double RESIZE_THRESHOLD = 0.75;
29 static final int REJECTED_THRESHOLD = 5;
33 private SlotBuffer buffer = null;
34 private CloudComm cloud = null;
35 private Random random = null;
36 private TableStatus liveTableStatus = null;
37 private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
38 private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
39 private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
42 private int numberOfSlots = 0; // Number of slots stored in buffer
43 private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
44 private long liveSlotCount = 0; // Number of currently live slots
45 private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
46 private long localMachineId = 0; // Machine ID of this client device
47 private long sequenceNumber = 0; // Largest sequence number a client has received
48 private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
49 private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
50 private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
51 private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
52 private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
53 private long localCommitSequenceNumber = 0;
56 private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
57 private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
58 private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
60 private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
61 private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
62 private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
63 private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
64 private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
65 private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
66 private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
67 private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
68 private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
69 private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
70 private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
71 private Map<IoTString, Commit> liveCommitsByKeyTable = null;
72 private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
73 private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
75 private List<Transaction> pendingTransactionQueue = null;
76 private List<Entry> pendingSendArbitrationEntries = null;
77 private List<Entry> pendingSendArbitrationEntriesToDelete = null;
78 private Map<Transaction, List<Integer>> transactionPartsSent = null;
79 private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
85 public Table(String baseurl, String password, long _localMachineId) {
86 localMachineId = _localMachineId;
87 cloud = new CloudComm(this, baseurl, password);
92 public Table(CloudComm _cloud, long _localMachineId) {
93 localMachineId = _localMachineId;
100 * Init all the stuff needed for for table usage
102 private void init() {
104 // Init helper objects
105 random = new Random();
106 buffer = new SlotBuffer();
109 oldestLiveSlotSequenceNumver = 1;
112 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
113 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
114 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
115 liveNewKeyTable = new HashMap<IoTString, NewKey>();
116 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
117 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
118 arbitratorTable = new HashMap<IoTString, Long>();
119 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
120 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
121 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
122 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
123 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
124 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
125 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
126 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
127 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
128 rejectedSlotList = new Vector<Long>();
129 pendingTransactionQueue = new ArrayList<Transaction>();
130 pendingSendArbitrationEntries = new ArrayList<Entry>();
131 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
132 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
133 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
136 numberOfSlots = buffer.capacity();
137 setResizeThreshold();
141 * Initialize the table by inserting a table status as the first entry into the table status
142 * also initialize the crypto stuff.
144 public synchronized void initTable() throws ServerException {
145 cloud.setSalt(); //Set the salt
147 // Create the first insertion into the block chain which is the table status
148 Slot s = new Slot(this, 1, localMachineId);
149 TableStatus status = new TableStatus(s, numberOfSlots);
151 Slot[] array = cloud.putSlot(s, numberOfSlots);
154 array = new Slot[] {s};
155 // update local block chain
156 validateAndUpdate(array, true);
158 throw new Error("Error on initialization");
163 * Rebuild the table from scratch by pulling the latest block chain from the server.
165 public synchronized void rebuild() throws ServerException {
166 // Just pull the latest slots from the server
167 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
168 validateAndUpdate(newslots, true);
171 // public String toString() {
172 // String retString = " Committed Table: \n";
173 // retString += "---------------------------\n";
174 // retString += commitedTable.toString();
176 // retString += "\n\n";
178 // retString += " Speculative Table: \n";
179 // retString += "---------------------------\n";
180 // retString += speculativeTable.toString();
185 public synchronized Long getArbitrator(IoTString key) {
186 return arbitratorTable.get(key);
189 public synchronized IoTString getCommitted(IoTString key) {
190 KeyValue kv = committedKeyValueTable.get(key);
193 return kv.getValue();
199 public synchronized IoTString getSpeculative(IoTString key) {
200 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
203 kv = speculatedKeyValueTable.get(key);
207 kv = committedKeyValueTable.get(key);
211 return kv.getValue();
217 public synchronized IoTString getCommittedAtomic(IoTString key) {
218 KeyValue kv = committedKeyValueTable.get(key);
220 if (arbitratorTable.get(key) == null) {
221 throw new Error("Key not Found.");
224 // Make sure new key value pair matches the current arbitrator
225 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
226 // TODO: Maybe not throw en error
227 throw new Error("Not all Key Values Match Arbitrator.");
231 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
232 return kv.getValue();
234 pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
239 public synchronized IoTString getSpeculativeAtomic(IoTString key) {
240 if (arbitratorTable.get(key) == null) {
241 throw new Error("Key not Found.");
244 // Make sure new key value pair matches the current arbitrator
245 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
246 // TODO: Maybe not throw en error
247 throw new Error("Not all Key Values Match Arbitrator.");
250 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
253 kv = speculatedKeyValueTable.get(key);
257 kv = committedKeyValueTable.get(key);
261 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
262 return kv.getValue();
264 pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
269 public synchronized void update() {
271 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
272 validateAndUpdate(newSlots, false);
274 } catch (Exception e) {
279 public synchronized boolean createNewKey(IoTString keyName, long machineId) {
281 if (arbitratorTable.get(keyName) != null) {
282 // There is already an arbitrator
286 NewKey newKey = new NewKey(null, keyName, machineId);
287 if (sendToServer(newKey)) {
288 // If successfully inserted
294 public void startTransaction() {
295 // Create a new transaction, invalidates any old pending transactions.
296 pendingTransactionBuilder = new PendingTransaction(localMachineId);
299 public synchronized void addKV(IoTString key, IoTString value) {
301 // Make sure it is a valid key
302 if (arbitratorTable.get(key) == null) {
303 throw new Error("Key not Found.");
306 // Make sure new key value pair matches the current arbitrator
307 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
308 // TODO: Maybe not throw en error
309 throw new Error("Not all Key Values Match Arbitrator.");
312 // Add the key value to this transaction
313 KeyValue kv = new KeyValue(key, value);
314 pendingTransactionBuilder.addKV(kv);
317 public synchronized TransactionStatus commitTransaction() {
319 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
320 // transaction with no updates will have no effect on the system
321 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
324 // Set the local transaction sequence number and increment
325 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
326 localTransactionSequenceNumber++;
328 // Create the transaction status
329 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
331 // Create the new transaction
332 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
333 newTransaction.setTransactionStatus(transactionStatus);
335 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
336 // Add it to the queue and invalidate the builder for safety
337 pendingTransactionQueue.add(newTransaction);
339 arbitrateOnLocalTransaction(newTransaction);
340 updateLiveStateFromLocal();
343 pendingTransactionBuilder = new PendingTransaction(localMachineId);
347 return transactionStatus;
351 * Get the machine ID for this client
353 public long getMachineId() {
354 return localMachineId;
358 * Decrement the number of live slots that we currently have
360 public void decrementLiveCount() {
365 * Recalculate the new resize threshold
367 private void setResizeThreshold() {
368 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
369 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
372 private boolean sendToServer(NewKey newKey) {
375 // While we have stuff that needs inserting into the block chain
376 while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationEntries.size() > 0) || (newKey != null)) {
379 // Thread.sleep(300);
380 // } catch (Exception e) {
385 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
387 // Try to fill the slot with data
388 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
389 boolean needsResize = fillSlotsReturn.getFirst();
390 int newSize = fillSlotsReturn.getSecond();
391 Boolean insertedNewKey = fillSlotsReturn.getThird();
394 // Reset which transaction to send
395 for (Transaction transaction : transactionPartsSent.keySet()) {
396 transaction.resetNextPartToSend();
398 // Set the transaction sequence number back to nothing
399 if (!transaction.didSendAPartToServer()) {
400 transaction.setSequenceNumber(-1);
404 // Clear the sent data since we are trying again
405 pendingSendArbitrationEntriesToDelete.clear();
406 transactionPartsSent.clear();
408 // We needed a resize so try again
409 fillSlot(slot, true, newKey);
412 // Try to send to the server
413 Pair<Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize);
415 if (sendSlotsReturn.getFirst()) {
416 // Did insert into the block chain
418 // New Key was successfully inserted into the block chain so dont want to insert it again
421 // Remove the aborts and commit parts that were sent from the pending to send queue
422 pendingSendArbitrationEntries.removeAll(pendingSendArbitrationEntriesToDelete);
424 for (Transaction transaction : transactionPartsSent.keySet()) {
426 // Update which transactions parts still need to be sent
427 transaction.removeSentParts(transactionPartsSent.get(transaction));
429 // Add the transaction status to the outstanding list
430 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
432 // Update the transaction status
433 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
435 // Check if all the transaction parts were successfully sent and if so then remove it from pending
436 if (transaction.didSendAllParts()) {
437 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
438 pendingTransactionQueue.remove(transaction);
442 // Reset which transaction to send
443 for (Transaction transaction : transactionPartsSent.keySet()) {
444 transaction.resetNextPartToSend();
446 // Set the transaction sequence number back to nothing
447 if (!transaction.didSendAPartToServer()) {
448 transaction.setSequenceNumber(-1);
453 // Clear the sent data in preparation for next send
454 pendingSendArbitrationEntriesToDelete.clear();
455 transactionPartsSent.clear();
457 if (sendSlotsReturn.getSecond().length != 0) {
458 // insert into the local block chain
459 validateAndUpdate(sendSlotsReturn.getSecond(), true);
462 } catch (ServerException e) {
464 if (e.getType() != ServerException.TypeInputTimeout) {
467 // Nothing was able to be sent to the server so just clear these data structures
468 for (Transaction transaction : transactionPartsSent.keySet()) {
469 // Set the transaction sequence number back to nothing
470 if (!transaction.didSendAPartToServer()) {
471 transaction.setSequenceNumber(-1);
475 pendingSendArbitrationEntriesToDelete.clear();
476 transactionPartsSent.clear();
478 // There was a partial send to the server
482 return newKey == null;
485 private Pair<Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize) throws ServerException {
487 boolean inserted = true;
489 Slot[] array = cloud.putSlot(slot, newSize);
491 array = new Slot[] {slot};
492 rejectedSlotList.clear();
494 if (array.length == 0) {
495 throw new Error("Server Error: Did not send any slots");
497 rejectedSlotList.add(slot.getSequenceNumber());
501 return new Pair<Boolean, Slot[]>(inserted, array);
505 * Returns false if a resize was needed
507 private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
509 if (liveSlotCount > bufferResizeThreshold) {
510 resize = true; //Resize is forced
514 newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
515 TableStatus status = new TableStatus(slot, newSize);
516 slot.addEntry(status);
519 // Fill with rejected slots first before doing anything else
520 doRejectedMessages(slot);
522 // Do mandatory rescue of entries
523 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
525 // Extract working variables
526 boolean needsResize = mandatoryRescueReturn.getFirst();
527 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
528 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
530 if (needsResize && !resize) {
531 // We need to resize but we are not resizing so return false
532 return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
535 boolean inserted = false;
536 if (newKeyEntry != null) {
537 newKeyEntry.setSlot(slot);
538 if (slot.hasSpace(newKeyEntry)) {
539 slot.addEntry(newKeyEntry);
544 // Clear the transactions, aborts and commits that were sent previously
545 transactionPartsSent.clear();
546 pendingSendArbitrationEntriesToDelete.clear();
548 // Insert pending arbitration data
549 for (Entry arbitrationData : pendingSendArbitrationEntries) {
551 // If it is an abort then we need to set some information
552 if (arbitrationData instanceof Abort) {
553 ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
556 if (!slot.hasSpace(arbitrationData)) {
557 // No space so cant do anything else with these data entries
561 // Add to this current slot and add it to entries to delete
562 slot.addEntry(arbitrationData);
563 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
566 // Insert as many transactions as possible while keeping order
567 for (Transaction transaction : pendingTransactionQueue) {
569 // Set the transaction sequence number if it has yet to be inserted into the block chain
570 if (!transaction.didSendAPartToServer()) {
571 transaction.setSequenceNumber(slot.getSequenceNumber());
574 boolean ranOutOfSpace = false;
577 TransactionPart part = transaction.getNextPartToSend();
580 // Ran out of parts to send for this transaction so move on
584 if (slot.hasSpace(part)) {
587 List<Integer> partsSent = transactionPartsSent.get(transaction);
588 if (partsSent == null) {
589 partsSent = new ArrayList<Integer>();
590 transactionPartsSent.put(transaction, partsSent);
593 partsSent.add(part.getPartNumber());
594 transactionPartsSent.put(transaction, partsSent);
597 ranOutOfSpace = true;
607 // Fill the remainder of the slot with rescue data
608 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
610 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
613 private void doRejectedMessages(Slot s) {
614 if (! rejectedSlotList.isEmpty()) {
615 /* TODO: We should avoid generating a rejected message entry if
616 * there is already a sufficient entry in the queue (e.g.,
617 * equalsto value of true and same sequence number). */
619 long old_seqn = rejectedSlotList.firstElement();
620 if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
621 long new_seqn = rejectedSlotList.lastElement();
622 RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
627 /* Go through list of missing messages */
628 for (; i < rejectedSlotList.size(); i++) {
629 long curr_seqn = rejectedSlotList.get(i);
630 Slot s_msg = buffer.getSlot(curr_seqn);
633 prev_seqn = curr_seqn;
635 /* Generate rejected message entry for missing messages */
636 if (prev_seqn != -1) {
637 RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
640 /* Generate rejected message entries for present messages */
641 for (; i < rejectedSlotList.size(); i++) {
642 long curr_seqn = rejectedSlotList.get(i);
643 Slot s_msg = buffer.getSlot(curr_seqn);
644 long machineid = s_msg.getMachineID();
645 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
652 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
653 long newestSequenceNumber = buffer.getNewestSeqNum();
654 long oldestSequenceNumber = buffer.getOldestSeqNum();
655 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
656 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
659 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
660 boolean seenLiveSlot = false;
661 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
662 long threshold = firstIfFull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
666 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
667 Slot previousSlot = buffer.getSlot(currentSequenceNumber);
668 // Push slot number forward
669 if (! seenLiveSlot) {
670 oldestLiveSlotSequenceNumver = currentSequenceNumber;
673 if (!previousSlot.isLive()) {
677 // We have seen a live slot
680 // Get all the live entries for a slot
681 Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
683 // Iterate over all the live entries and try to rescue them
684 for (Entry liveEntry : liveEntries) {
685 if (slot.hasSpace(liveEntry)) {
687 // Enough space to rescue the entry
688 slot.addEntry(liveEntry);
689 } else if (currentSequenceNumber == firstIfFull) {
690 //if there's no space but the entry is about to fall off the queue
691 System.out.println("B"); //?
692 return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
699 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
702 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
703 /* now go through live entries from least to greatest sequence number until
704 * either all live slots added, or the slot doesn't have enough room
705 * for SKIP_THRESHOLD consecutive entries*/
707 long newestseqnum = buffer.getNewestSeqNum();
709 for (; seqn <= newestseqnum; seqn++) {
710 Slot prevslot = buffer.getSlot(seqn);
711 //Push slot number forward
713 oldestLiveSlotSequenceNumver = seqn;
715 if (!prevslot.isLive())
718 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
719 for (Entry liveentry : liveentries) {
720 if (s.hasSpace(liveentry))
721 s.addEntry(liveentry);
724 if (skipcount > SKIP_THRESHOLD)
732 * Checks for malicious activity and updates the local copy of the block chain.
734 private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
736 // The cloud communication layer has checked slot HMACs already before decoding
737 if (newSlots.length == 0) {
741 // Reset the table status declared sizes
742 smallestTableStatusSeen = -1;
743 largestTableStatusSeen = -1;
746 // Make sure all slots are newer than the last largest slot this client has seen
747 long firstSeqNum = newSlots[0].getSequenceNumber();
748 if (firstSeqNum <= sequenceNumber) {
749 throw new Error("Server Error: Sent older slots!");
752 // Create an object that can access both new slots and slots in our local chain
753 // without committing slots to our local chain
754 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
756 // Check that the HMAC chain is not broken
757 checkHMACChain(indexer, newSlots);
759 // Set to keep track of messages from clients
760 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
762 // Process each slots data
763 for (Slot slot : newSlots) {
764 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
767 // If there is a gap, check to see if the server sent us everything.
768 if (firstSeqNum != (sequenceNumber + 1)) {
770 // Check the size of the slots that were sent down by the server.
771 // Can only check the size if there was a gap
772 checkNumSlots(newSlots.length);
774 // Since there was a gap every machine must have pushed a slot or must have
775 // a last message message. If not then the server is hiding slots
776 if (!machineSet.isEmpty()) {
777 throw new Error("Missing record for machines: " + machineSet);
781 // Update the size of our local block chain.
784 // Commit new to slots to the local block chain.
785 for (Slot slot : newSlots) {
787 // Insert this slot into our local block chain copy.
788 buffer.putSlot(slot);
790 // Keep track of how many slots are currently live (have live data in them).
794 // Get the sequence number of the latest slot in the system
795 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
797 updateLiveStateFromServer();
800 private void updateLiveStateFromServer() {
801 // Process the new transaction parts
802 processNewTransactionParts();
804 // Do arbitration on new transactions that were received
805 arbitrateFromServer();
807 // Update all the committed keys
808 boolean didCommitOrSpeculate = updateCommittedTable();
810 // Delete the transactions that are now dead
811 updateLiveTransactionsAndStatus();
814 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
815 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
818 private void updateLiveStateFromLocal() {
819 // Update all the committed keys
820 boolean didCommitOrSpeculate = updateCommittedTable();
822 // Delete the transactions that are now dead
823 updateLiveTransactionsAndStatus();
826 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
827 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
831 * Check the size of the block chain to make sure there are enough slots sent back by the server.
832 * This is only called when we have a gap between the slots that we have locally and the slots
833 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
836 private void checkNumSlots(int numberOfSlots) {
838 // We only have 1 size so we must have this many slots
839 if (largestTableStatusSeen == smallestTableStatusSeen) {
840 if (numberOfSlots != smallestTableStatusSeen) {
841 throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
844 // We have more than 1
845 if (numberOfSlots < smallestTableStatusSeen) {
846 throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
852 * Update the size of of the local buffer if it is needed.
854 private void commitNewMaxSize() {
858 if (largestTableStatusSeen == -1) {
859 // No table status seen so the current max size does not change
860 currMaxSize = numberOfSlots;
862 currMaxSize = largestTableStatusSeen;
865 // Resize the local slot buffer
866 if (numberOfSlots != currMaxSize) {
867 buffer.resize(currMaxSize);
870 // Change the number of local slots to the new size
871 numberOfSlots = currMaxSize;
873 // Recalculate the resize threshold since the size of the local buffer has changed
874 setResizeThreshold();
878 * Process the new transaction parts from this latest round of slots received from the server
880 private void processNewTransactionParts() {
882 if (newTransactionParts.size() == 0) {
883 // Nothing new to process
887 // Iterate through all the machine Ids that we received new parts for
888 for (Long machineId : newTransactionParts.keySet()) {
889 Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
891 // Iterate through all the parts for that machine Id
892 for (Pair<Long, Integer> partId : parts.keySet()) {
893 TransactionPart part = parts.get(partId);
895 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
896 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
897 // Set dead the transaction part
902 // Get the transaction object for that sequence number
903 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
905 if (transaction == null) {
906 // This is a new transaction that we dont have so make a new one
907 transaction = new Transaction();
909 // Insert this new transaction into the live tables
910 liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
911 liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
914 // Add that part to the transaction
915 transaction.addPartDecode(part);
919 // Clear all the new transaction parts in preparation for the next time the server sends slots
920 newTransactionParts.clear();
923 public void arbitrateFromServer() {
925 if (liveTransactionBySequenceNumberTable.size() == 0) {
926 // Nothing to arbitrate on so move on
930 // Get the transaction sequence numbers and sort from oldest to newest
931 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
932 Collections.sort(transactionSequenceNumbers);
934 // Collection of key value pairs that are
935 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
937 // The last transaction arbitrated on
938 long lastTransactionCommitted = -1;
940 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
941 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
943 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
944 if (transaction.getArbitrator() != localMachineId) {
949 if (!transaction.isComplete()) {
950 // Will arbitrate in incorrect order if we continue so just break
956 if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
957 // Guard evaluated as true
959 // Update the local changes so we can make the commit
960 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
961 speculativeTableTmp.put(kv.getKey(), kv);
964 // Update what the last transaction committed was for use in batch commit
965 lastTransactionCommitted = transaction.getSequenceNumber();
968 // Guard evaluated was false so create abort
971 Abort newAbort = new Abort(null,
972 transaction.getClientLocalSequenceNumber(),
973 transaction.getSequenceNumber(),
974 transaction.getMachineId(),
975 transaction.getArbitrator());
977 // Add the abort to the queue of aborts to send out
978 pendingSendArbitrationEntries.add(newAbort);
980 // Insert the abort so we can process
981 processEntry(newAbort);
985 // If there is something to commit
986 if (speculativeTableTmp.size() != 0) {
988 // Create the commit and increment the commit sequence number
989 Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, lastTransactionCommitted);
990 localCommitSequenceNumber++;
992 // Add all the new keys to the commit
993 for (KeyValue kv : speculativeTableTmp.values()) {
997 // create the commit parts
998 newCommit.createCommitParts();
1000 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1001 pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
1003 // Insert the commit so we can process it
1004 for (CommitPart commitPart : newCommit.getParts().values()) {
1005 processEntry(commitPart);
1010 public void arbitrateOnLocalTransaction(Transaction transaction) {
1012 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1013 if (transaction.getArbitrator() != localMachineId) {
1017 if (!transaction.isComplete()) {
1018 // Will arbitrate in incorrect order if we continue so just break
1023 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1024 // Guard evaluated as true
1026 // Create the commit and increment the commit sequence number
1027 Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, -1);
1028 localCommitSequenceNumber++;
1030 // Update the local changes so we can make the commit
1031 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1032 newCommit.addKV(kv);
1035 // create the commit parts
1036 newCommit.createCommitParts();
1038 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1039 pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
1041 // Insert the commit so we can process it
1042 for (CommitPart commitPart : newCommit.getParts().values()) {
1043 processEntry(commitPart);
1046 TransactionStatus status = transaction.getTransactionStatus();
1047 status.setStatus(TransactionStatus.StatusCommitted);
1050 // Guard evaluated was false so create abort
1051 TransactionStatus status = transaction.getTransactionStatus();
1052 status.setStatus(TransactionStatus.StatusAborted);
1057 * Update all the commits and the committed tables, sets dead the dead transactions
1059 private boolean updateCommittedTable() {
1061 if (newCommitParts.size() == 0) {
1062 // Nothing new to process
1066 // Iterate through all the machine Ids that we received new parts for
1067 for (Long machineId : newCommitParts.keySet()) {
1068 Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1070 // Iterate through all the parts for that machine Id
1071 for (Pair<Long, Integer> partId : parts.keySet()) {
1072 CommitPart part = parts.get(partId);
1074 // Get the transaction object for that sequence number
1075 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1077 if (commitForClientTable == null) {
1078 // This is the first commit from this device
1079 commitForClientTable = new HashMap<Long, Commit>();
1080 liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1083 Commit commit = commitForClientTable.get(part.getSequenceNumber());
1085 if (commit == null) {
1086 // This is a new commit that we dont have so make a new one
1087 commit = new Commit();
1089 // Insert this new commit into the live tables
1090 commitForClientTable.put(part.getSequenceNumber(), commit);
1093 // Add that part to the commit
1094 commit.addPartDecode(part);
1098 // Clear all the new commits parts in preparation for the next time the server sends slots
1099 newCommitParts.clear();
1101 // If we process a new commit keep track of it for future use
1102 boolean didProcessANewCommit = false;
1104 // Process the commits one by one
1105 for (Long arbitratorId : liveCommitsTable.keySet()) {
1107 // Get all the commits for a specific arbitrator
1108 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
1110 // Sort the commits in order
1111 List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1112 Collections.sort(commitSequenceNumbers);
1114 // Go through each new commit one by one
1115 for (int i = 0; i < commitSequenceNumbers.size(); i++) {
1116 Long commitSequenceNumber = commitSequenceNumbers.get(i);
1117 Commit commit = commitForClientTable.get(commitSequenceNumber);
1119 // Special processing if a commit is not complete
1120 if (!commit.isComplete()) {
1121 if (i == (commitSequenceNumbers.size() - 1)) {
1122 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1125 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
1126 // Delete it and move on
1128 commitForClientTable.remove(commit.getSequenceNumber());
1133 // Get the last commit seen from this arbitrator
1134 long lastCommitSeenSequenceNumber = -1;
1135 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
1136 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId());
1144 // We have already seen this commit before so need to do the full processing on this commit
1145 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1147 // Update the last transaction that was updated if we can
1148 if (commit.getTransactionSequenceNumber() != -1) {
1149 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1151 // Update the last transaction sequence number that the arbitrator arbitrated on
1152 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1153 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1161 // If we got here then this is a brand new commit and needs full processing
1163 // Get what commits should be edited, these are the commits that have live values for their keys
1164 Set<Commit> commitsToEdit = new HashSet<Commit>();
1165 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1166 commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
1168 commitsToEdit.remove(null); // remove null since it could be in this set
1170 // Update each previous commit that needs to be updated
1171 for (Commit previousCommit : commitsToEdit) {
1173 // Only bother with live commits (TODO: Maybe remove this check)
1174 if (previousCommit.isLive()) {
1176 // Update which keys in the old commits are still live
1177 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1178 previousCommit.invalidateKey(kv.getKey());
1181 // if the commit is now dead then remove it
1182 if (!previousCommit.isLive()) {
1183 commitForClientTable.remove(previousCommit);
1188 // Update the last seen sequence number from this arbitrator
1189 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
1191 // Update the last transaction that was updated if we can
1192 if (commit.getTransactionSequenceNumber() != -1) {
1193 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1195 // Update the last transaction sequence number that the arbitrator arbitrated on
1196 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1197 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1201 // We processed a new commit that we havent seen before
1202 didProcessANewCommit = true;
1204 // Update the committed table of keys and which commit is using which key
1205 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1206 committedKeyValueTable.put(kv.getKey(), kv);
1207 liveCommitsByKeyTable.put(kv.getKey(), commit);
1212 return didProcessANewCommit;
1216 * Create the speculative table from transactions that are still live and have come from the cloud
1218 private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
1219 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
1220 // There is nothing to speculate on
1224 // Create a list of the transaction sequence numbers and sort them from oldest to newest
1225 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1226 Collections.sort(transactionSequenceNumbersSorted);
1228 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
1231 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
1232 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
1233 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
1235 // Start from scratch
1236 speculatedKeyValueTable.clear();
1237 lastTransactionSequenceNumberSpeculatedOn = -1;
1238 oldestTransactionSequenceNumberSpeculatedOn = -1;
1242 // Remember the front of the transaction list
1243 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
1245 // Find where to start arbitration from
1246 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
1248 if (startIndex >= transactionSequenceNumbersSorted.size()) {
1249 // Make sure we are not out of bounds
1250 return false; // did not speculate
1253 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
1254 boolean didSkip = true;
1256 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
1257 long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
1258 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1260 if (!transaction.isComplete()) {
1261 // If there is an incomplete transaction then there is nothing we can do
1262 // add this transactions arbitrator to the list of arbitrators we should ignore
1263 incompleteTransactionArbitrator.add(transaction.getArbitrator());
1268 if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
1272 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
1274 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
1275 // Guard evaluated to true so update the speculative table
1276 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1277 speculatedKeyValueTable.put(kv.getKey(), kv);
1283 // Since there was a skip we need to redo the speculation next time around
1284 lastTransactionSequenceNumberSpeculatedOn = -1;
1285 oldestTransactionSequenceNumberSpeculatedOn = -1;
1288 // We did some speculation
1293 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
1295 private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
1296 if (pendingTransactionQueue.size() == 0) {
1297 // There is nothing to speculate on
1302 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
1303 // need to reset on the pending speculation
1304 lastPendingTransactionSpeculatedOn = null;
1305 firstPendingTransaction = pendingTransactionQueue.get(0);
1306 pendingTransactionSpeculatedKeyValueTable.clear();
1309 // Find where to start arbitration from
1310 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
1312 if (startIndex >= pendingTransactionQueue.size()) {
1313 // Make sure we are not out of bounds
1317 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
1318 Transaction transaction = pendingTransactionQueue.get(i);
1320 lastPendingTransactionSpeculatedOn = transaction;
1322 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
1323 // Guard evaluated to true so update the speculative table
1324 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1325 pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
1332 * Set dead and remove from the live transaction tables the transactions that are dead
1334 private void updateLiveTransactionsAndStatus() {
1336 // Go through each of the transactions
1337 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
1338 Transaction transaction = iter.next().getValue();
1340 // Check if the transaction is dead
1341 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
1342 if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
1344 // Set dead the transaction
1345 transaction.setDead();
1347 // Remove the transaction from the live table
1349 liveTransactionByTransactionIdTable.remove(transaction.getId());
1353 // Go through each of the transactions
1354 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
1355 TransactionStatus status = iter.next().getValue();
1357 // Check if the transaction is dead
1358 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
1359 if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
1362 status.setStatus(TransactionStatus.StatusCommitted);
1372 * Process this slot, entry by entry. Also update the latest message sent by slot
1374 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
1376 // Update the last message seen
1377 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
1379 // Process each entry in the slot
1380 for (Entry entry : slot.getEntries()) {
1381 switch (entry.getType()) {
1383 case Entry.TypeCommitPart:
1384 processEntry((CommitPart)entry);
1387 case Entry.TypeAbort:
1388 processEntry((Abort)entry);
1391 case Entry.TypeTransactionPart:
1392 processEntry((TransactionPart)entry);
1395 case Entry.TypeNewKey:
1396 processEntry((NewKey)entry);
1399 case Entry.TypeLastMessage:
1400 processEntry((LastMessage)entry, machineSet);
1403 case Entry.TypeRejectedMessage:
1404 processEntry((RejectedMessage)entry, indexer);
1407 case Entry.TypeTableStatus:
1408 processEntry((TableStatus)entry);
1412 throw new Error("Unrecognized type: " + entry.getType());
1418 * Update the last message that was sent for a machine Id
1420 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1421 // Update what the last message received by a machine was
1422 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1426 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
1428 private void processEntry(NewKey entry) {
1430 // Update the arbitrator table with the new key information
1431 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1433 // Update what the latest live new key is
1434 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
1435 if (oldNewKey != null) {
1436 // Delete the old new key messages
1437 oldNewKey.setDead();
1442 * Process new table status entries and set dead the old ones as new ones come in.
1443 * keeps track of the largest and smallest table status seen in this current round
1444 * of updating the local copy of the block chain
1446 private void processEntry(TableStatus entry) {
1447 int newNumSlots = entry.getMaxSlots();
1449 if (liveTableStatus != null) {
1450 // We have a larger table status so the old table status is no longer alive
1451 liveTableStatus.setDead();
1454 // Make this new table status the latest alive table status
1455 liveTableStatus = entry;
1457 if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
1458 smallestTableStatusSeen = newNumSlots;
1461 if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
1462 largestTableStatusSeen = newNumSlots;
1467 * Check old messages to see if there is a block chain violation. Also
1469 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1470 long oldSeqNum = entry.getOldSeqNum();
1471 long newSeqNum = entry.getNewSeqNum();
1472 boolean isequal = entry.getEqual();
1473 long machineId = entry.getMachineID();
1476 // Check if we have messages that were supposed to be rejected in our local block chain
1477 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
1480 Slot slot = indexer.getSlot(seqNum);
1483 // If we have this slot make sure that it was not supposed to be a rejected slot
1485 long slotMachineId = slot.getMachineID();
1486 if (isequal != (slotMachineId == machineId)) {
1487 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
1493 // Create a list of clients to watch until they see this rejected message entry.
1494 HashSet<Long> deviceWatchSet = new HashSet<Long>();
1495 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
1497 // Machine ID for the last message entry
1498 long lastMessageEntryMachineId = lastMessageEntry.getKey();
1500 // We've seen it, don't need to continue to watch. Our next
1501 // message will implicitly acknowledge it.
1502 if (lastMessageEntryMachineId == localMachineId) {
1506 Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
1507 long entrySequenceNumber = lastMessageValue.getFirst();
1509 if (entrySequenceNumber < newSeqNum) {
1511 // Add this rejected message to the set of messages that this machine ID did not see yet
1512 addWatchList(lastMessageEntryMachineId, entry);
1514 // This client did not see this rejected message yet so add it to the watch set to monitor
1515 deviceWatchSet.add(lastMessageEntryMachineId);
1519 if (deviceWatchSet.isEmpty()) {
1520 // This rejected message has been seen by all the clients so
1523 // We need to watch this rejected message
1524 entry.setWatchSet(deviceWatchSet);
1529 * Check if this abort is live, if not then save it so we can kill it later.
1530 * update the last transaction number that was arbitrated on.
1532 private void processEntry(Abort entry) {
1534 // Abort has not been seen by the client it is for yet so we need to keep track of it
1535 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
1536 if (previouslySeenAbort != null) {
1537 previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
1540 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
1542 // The machine already saw this so it is dead
1544 liveAbortTable.remove(entry);
1549 // update the transaction status
1550 TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
1551 if (status != null) {
1552 status.setStatus(TransactionStatus.StatusAborted);
1556 // Set dead a transaction if we can
1557 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
1558 if (transactionToSetDead != null) {
1559 liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
1562 // Update the last transaction sequence number that the arbitrator arbitrated on
1563 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
1564 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
1567 if (entry.getTransactionSequenceNumber() != -1) {
1568 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
1574 * Set dead the transaction part if that transaction is dead and keep track of all new parts
1576 private void processEntry(TransactionPart entry) {
1577 // Check if we have already seen this transaction and set it dead OR if it is not alive
1578 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
1579 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
1580 // This transaction is dead, it was already committed or aborted
1585 // This part is still alive
1586 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
1588 if (transactionPart == null) {
1589 // Dont have a table for this machine Id yet so make one
1590 transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
1591 newTransactionParts.put(entry.getMachineId(), transactionPart);
1594 // Update the part and set dead ones we have already seen (got a rescued version)
1595 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
1596 if (previouslySeenPart != null) {
1597 previouslySeenPart.setDead();
1602 * Process new commit entries and save them for future use. Delete duplicates
1604 private void processEntry(CommitPart entry) {
1605 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
1607 if (commitPart == null) {
1608 // Dont have a table for this machine Id yet so make one
1609 commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
1610 newCommitParts.put(entry.getMachineId(), commitPart);
1613 // Update the part and set dead ones we have already seen (got a rescued version)
1614 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
1615 if (previouslySeenPart != null) {
1616 previouslySeenPart.setDead();
1621 * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
1622 * Updates the live aborts, removes those that are dead and sets them dead.
1623 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
1624 * other clients have not had a rollback on the last message.
1626 private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
1628 // We have seen this machine ID
1629 machineSet.remove(machineId);
1631 // Get the set of rejected messages that this machine Id is has not seen yet
1632 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
1634 // If there is a rejected message that this machine Id has not seen yet
1635 if (watchset != null) {
1637 // Go through each rejected message that this machine Id has not seen yet
1638 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1640 RejectedMessage rm = rmit.next();
1642 // If this machine Id has seen this rejected message...
1643 if (rm.getNewSeqNum() <= seqNum) {
1645 // Remove it from our watchlist
1648 // Decrement machines that need to see this notification
1649 rm.removeWatcher(machineId);
1654 // Set dead the abort
1655 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
1656 Abort abort = i.next().getValue();
1658 if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
1666 if (machineId == localMachineId) {
1667 // Our own messages are immediately dead.
1668 if (liveness instanceof LastMessage) {
1669 ((LastMessage)liveness).setDead();
1670 } else if (liveness instanceof Slot) {
1671 ((Slot)liveness).setDead();
1673 throw new Error("Unrecognized type");
1677 // Get the old last message for this device
1678 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
1679 if (lastMessageEntry == null) {
1680 // If no last message then there is nothing else to process
1684 long lastMessageSeqNum = lastMessageEntry.getFirst();
1685 Liveness lastEntry = lastMessageEntry.getSecond();
1687 // If it is not our machine Id since we already set ours to dead
1688 if (machineId != localMachineId) {
1689 if (lastEntry instanceof LastMessage) {
1690 ((LastMessage)lastEntry).setDead();
1691 } else if (lastEntry instanceof Slot) {
1692 ((Slot)lastEntry).setDead();
1694 throw new Error("Unrecognized type");
1698 // Make sure the server is not playing any games
1699 if (machineId == localMachineId) {
1701 // We were not making any updates and we had a machine mismatch
1702 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
1703 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqNum + " got: " + lastMessageSeqNum);
1706 if (lastMessageSeqNum > seqNum) {
1707 throw new Error("Server Error: Rollback on remote machine sequence number");
1713 * Add a rejected message entry to the watch set to keep track of which clients have seen that
1714 * rejected message entry and which have not.
1716 private void addWatchList(long machineId, RejectedMessage entry) {
1717 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
1718 if (entries == null) {
1719 // There is no set for this machine ID yet so create one
1720 entries = new HashSet<RejectedMessage>();
1721 rejectedMessageWatchListTable.put(machineId, entries);
1727 * Check if the HMAC chain is not violated
1729 private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
1730 for (int i = 0; i < newSlots.length; i++) {
1731 Slot currSlot = newSlots[i];
1732 Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
1733 if (prevSlot != null &&
1734 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
1735 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);