Local communication working
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23
24
25         /* Constants */
26         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
27         static final int SKIP_THRESHOLD = 10;
28         static final double RESIZE_MULTIPLE = 1.2;
29         static final double RESIZE_THRESHOLD = 0.75;
30         static final int REJECTED_THRESHOLD = 5;
31
32         /* Helper Objects */
33         private SlotBuffer buffer = null;
34         private CloudComm cloud = null;
35         private Random random = null;
36         private TableStatus liveTableStatus = null;
37         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
38         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
39         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
40
41         /* Variables */
42         private int numberOfSlots = 0;  // Number of slots stored in buffer
43         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
44         private long liveSlotCount = 0; // Number of currently live slots
45         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
46         private long localMachineId = 0; // Machine ID of this client device
47         private long sequenceNumber = 0; // Largest sequence number a client has received
48         private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
49         private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
50         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
51         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
52         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
53         private long localArbitrationSequenceNumber = 0;
54         private boolean hadPartialSendToServer = false;
55         private boolean attemptedToSendToServer = false;
56
57         /* Data Structures  */
58         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
59         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
60         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
61         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
62         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
63         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
64         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
65         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
66         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
67         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
68         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
69         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
70         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
71         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
72         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
73         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
74         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
75         private List<Transaction> pendingTransactionQueue = null;
76         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
77         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
78         private Map<Transaction, List<Integer>> transactionPartsSent = null;
79         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
80         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
81         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
82         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
83         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
84         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
85
86
87
88
89
90         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
91                 localMachineId = _localMachineId;
92                 cloud = new CloudComm(this, baseurl, password, listeningPort);
93
94                 init();
95         }
96
97         public Table(CloudComm _cloud, long _localMachineId) {
98                 localMachineId = _localMachineId;
99                 cloud = _cloud;
100
101                 init();
102         }
103
104         /**
105          * Init all the stuff needed for for table usage
106          */
107         private void init() {
108
109                 // Init helper objects
110                 random = new Random();
111                 buffer = new SlotBuffer();
112
113                 // Set Variables
114                 oldestLiveSlotSequenceNumver = 1;
115
116                 // init data structs
117                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
118                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
119                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
120                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
121                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
122                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
123                 arbitratorTable = new HashMap<IoTString, Long>();
124                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
125                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
126                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
127                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
128                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
129                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
130                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
131                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
132                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
133                 rejectedSlotList = new Vector<Long>();
134                 pendingTransactionQueue = new ArrayList<Transaction>();
135                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
136                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
137                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
138                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
139                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
140                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
141                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
142                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
143                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
144
145
146                 // Other init stuff
147                 numberOfSlots = buffer.capacity();
148                 setResizeThreshold();
149         }
150
151         // TODO: delete method
152         public synchronized void printSlots() {
153                 long o = buffer.getOldestSeqNum();
154                 long n = buffer.getNewestSeqNum();
155
156                 int[] types = new int[10];
157
158                 int num = 0;
159
160                 int livec = 0;
161                 int deadc = 0;
162                 for (long i = o; i < (n + 1); i++) {
163                         Slot s = buffer.getSlot(i);
164
165                         Vector<Entry> entries = s.getEntries();
166
167                         for (Entry e : entries) {
168                                 if (e.isLive()) {
169                                         int type = e.getType();
170                                         types[type] = types[type] + 1;
171                                         num++;
172                                         livec++;
173                                 } else {
174                                         deadc++;
175                                 }
176                         }
177                 }
178
179                 for (int i = 0; i < 10; i++) {
180                         System.out.println(i + "    " + types[i]);
181                 }
182                 System.out.println("Live count:   " + livec);
183                 System.out.println("Dead count:   " + deadc);
184                 System.out.println("Old:   " + o);
185                 System.out.println("New:   " + n);
186                 System.out.println("Size:   " + buffer.size());
187
188                 // List<IoTString> strList = new ArrayList<IoTString>();
189                 // for (int i = 0; i < 100; i++) {
190                 //      String keyA = "a" + i;
191                 //      String keyB = "b" + i;
192                 //      String keyC = "c" + i;
193                 //      String keyD = "d" + i;
194
195                 //      IoTString iKeyA = new IoTString(keyA);
196                 //      IoTString iKeyB = new IoTString(keyB);
197                 //      IoTString iKeyC = new IoTString(keyC);
198                 //      IoTString iKeyD = new IoTString(keyD);
199
200                 //      strList.add(iKeyA);
201                 //      strList.add(iKeyB);
202                 //      strList.add(iKeyC);
203                 //      strList.add(iKeyD);
204                 // }
205
206
207                 // for (Long l : commitMap.keySet()) {
208                 //      for (Long l2 : commitMap.get(l).keySet()) {
209                 //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
210                 //                      strList.remove(kv.getKey());
211                 //                      System.out.print(kv.getKey() + "    ");
212                 //              }
213                 //      }
214                 // }
215
216                 // System.out.println();
217                 // System.out.println();
218
219                 // for (IoTString s : strList) {
220                 //      System.out.print(s + "    ");
221                 // }
222                 // System.out.println();
223                 // System.out.println(strList.size());
224         }
225
226         /**
227          * Initialize the table by inserting a table status as the first entry into the table status
228          * also initialize the crypto stuff.
229          */
230         public synchronized void initTable() throws ServerException {
231                 cloud.initSecurity();
232
233                 // Create the first insertion into the block chain which is the table status
234                 Slot s = new Slot(this, 1, localMachineId);
235                 TableStatus status = new TableStatus(s, numberOfSlots);
236                 s.addEntry(status);
237                 Slot[] array = cloud.putSlot(s, numberOfSlots);
238
239                 if (array == null) {
240                         array = new Slot[] {s};
241                         // update local block chain
242                         validateAndUpdate(array, true);
243                 } else if (array.length == 1) {
244                         // in case we did push the slot BUT we failed to init it
245                         validateAndUpdate(array, true);
246                 } else {
247                         throw new Error("Error on initialization");
248                 }
249         }
250
251         /**
252          * Rebuild the table from scratch by pulling the latest block chain from the server.
253          */
254         public synchronized void rebuild() throws ServerException {
255                 // Just pull the latest slots from the server
256                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
257                 validateAndUpdate(newslots, true);
258         }
259
260 // public String toString() {
261 //      String retString = " Committed Table: \n";
262 //      retString += "---------------------------\n";
263 //      retString += commitedTable.toString();
264
265 //      retString += "\n\n";
266
267 //      retString += " Speculative Table: \n";
268 //      retString += "---------------------------\n";
269 //      retString += speculativeTable.toString();
270
271 //      return retString;
272 // }
273
274         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
275                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
276         }
277
278         public synchronized Long getArbitrator(IoTString key) {
279                 return arbitratorTable.get(key);
280         }
281
282         public synchronized void close() {
283                 cloud.close();
284         }
285
286         public synchronized IoTString getCommitted(IoTString key)  {
287                 KeyValue kv = committedKeyValueTable.get(key);
288
289                 if (kv != null) {
290                         return kv.getValue();
291                 } else {
292                         return null;
293                 }
294         }
295
296         public synchronized IoTString getSpeculative(IoTString key) {
297                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
298
299                 if (kv == null) {
300                         kv = speculatedKeyValueTable.get(key);
301                 }
302
303                 if (kv == null) {
304                         kv = committedKeyValueTable.get(key);
305                 }
306
307                 if (kv != null) {
308                         return kv.getValue();
309                 } else {
310                         return null;
311                 }
312         }
313
314         public synchronized IoTString getCommittedAtomic(IoTString key) {
315                 KeyValue kv = committedKeyValueTable.get(key);
316
317                 if (arbitratorTable.get(key) == null) {
318                         throw new Error("Key not Found.");
319                 }
320
321                 // Make sure new key value pair matches the current arbitrator
322                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
323                         // TODO: Maybe not throw en error
324                         throw new Error("Not all Key Values Match Arbitrator.");
325                 }
326
327                 if (kv != null) {
328                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
329                         return kv.getValue();
330                 } else {
331                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
332                         return null;
333                 }
334         }
335
336         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
337                 if (arbitratorTable.get(key) == null) {
338                         throw new Error("Key not Found.");
339                 }
340
341                 // Make sure new key value pair matches the current arbitrator
342                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
343                         // TODO: Maybe not throw en error
344                         throw new Error("Not all Key Values Match Arbitrator.");
345                 }
346
347                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
348
349                 if (kv == null) {
350                         kv = speculatedKeyValueTable.get(key);
351                 }
352
353                 if (kv == null) {
354                         kv = committedKeyValueTable.get(key);
355                 }
356
357                 if (kv != null) {
358                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
359                         return kv.getValue();
360                 } else {
361                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
362                         return null;
363                 }
364         }
365
366         public synchronized boolean update()  {
367                 try {
368                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
369                         validateAndUpdate(newSlots, false);
370                         sendToServer(null);
371
372                         return true;
373                 } catch (Exception e) {
374                         // e.printStackTrace();
375                 }
376
377                 return false;
378         }
379
380         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
381                 while (true) {
382                         if (arbitratorTable.get(keyName) != null) {
383                                 // There is already an arbitrator
384                                 return false;
385                         }
386
387                         NewKey newKey = new NewKey(null, keyName, machineId);
388                         if (sendToServer(newKey)) {
389                                 // If successfully inserted
390                                 return true;
391                         }
392                 }
393         }
394
395         public synchronized void startTransaction() {
396                 // Create a new transaction, invalidates any old pending transactions.
397                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
398         }
399
400         public synchronized void addKV(IoTString key, IoTString value) {
401
402                 // Make sure it is a valid key
403                 if (arbitratorTable.get(key) == null) {
404                         throw new Error("Key not Found.");
405                 }
406
407                 // Make sure new key value pair matches the current arbitrator
408                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
409                         // TODO: Maybe not throw en error
410                         throw new Error("Not all Key Values Match Arbitrator.");
411                 }
412
413                 // Add the key value to this transaction
414                 KeyValue kv = new KeyValue(key, value);
415                 pendingTransactionBuilder.addKV(kv);
416         }
417
418         public synchronized TransactionStatus commitTransaction() {
419
420                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
421                         // transaction with no updates will have no effect on the system
422                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
423                 }
424
425                 // Set the local transaction sequence number and increment
426                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
427                 localTransactionSequenceNumber++;
428
429                 // Create the transaction status
430                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
431
432                 // Create the new transaction
433                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
434                 newTransaction.setTransactionStatus(transactionStatus);
435
436                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
437                         // Add it to the queue and invalidate the builder for safety
438                         pendingTransactionQueue.add(newTransaction);
439                 } else {
440                         arbitrateOnLocalTransaction(newTransaction);
441                         updateLiveStateFromLocal();
442                 }
443
444                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
445
446                 try {
447                         sendToServer(null);
448                 } catch (ServerException e) {
449
450                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
451                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
452                                 Transaction transaction = iter.next();
453
454                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
455                                         // Already contacted this client so ignore all attempts to contact this client
456                                         // to preserve ordering for arbitrator
457                                         continue;
458                                 }
459
460                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
461
462                                 if (sendReturn.getFirst()) {
463                                         // Failed to contact over local
464                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
465                                 } else {
466                                         // Successful contact or should not contact
467
468                                         if (sendReturn.getSecond()) {
469                                                 // did arbitrate
470                                                 iter.remove();
471                                         }
472                                 }
473                         }
474                 }
475
476                 updateLiveStateFromLocal();
477
478                 return transactionStatus;
479         }
480
481         /**
482          * Get the machine ID for this client
483          */
484         public long getMachineId() {
485                 return localMachineId;
486         }
487
488         /**
489          * Decrement the number of live slots that we currently have
490          */
491         public void decrementLiveCount() {
492                 liveSlotCount--;
493         }
494
495         /**
496          * Recalculate the new resize threshold
497          */
498         private void setResizeThreshold() {
499                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
500                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
501         }
502
503         private boolean sendToServer(NewKey newKey) throws ServerException {
504
505                 try {
506                         // While we have stuff that needs inserting into the block chain
507                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
508
509                                 // Create the slot
510                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
511
512                                 // Try to fill the slot with data
513                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
514                                 boolean needsResize = fillSlotsReturn.getFirst();
515                                 int newSize = fillSlotsReturn.getSecond();
516                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
517
518                                 if (needsResize) {
519                                         // Reset which transaction to send
520                                         for (Transaction transaction : transactionPartsSent.keySet()) {
521                                                 transaction.resetNextPartToSend();
522
523                                                 // Set the transaction sequence number back to nothing
524                                                 if (!transaction.didSendAPartToServer()) {
525                                                         transaction.setSequenceNumber(-1);
526                                                 }
527                                         }
528
529                                         // Clear the sent data since we are trying again
530                                         pendingSendArbitrationEntriesToDelete.clear();
531                                         transactionPartsSent.clear();
532
533                                         // We needed a resize so try again
534                                         fillSlot(slot, true, newKey);
535                                 }
536
537                                 // Try to send to the server
538                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
539
540                                 if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
541                                         // Did insert into the block chain
542
543                                         if (sendSlotsReturn.getFirst()) {
544                                                 // This slot was what was inserted not a previous slot
545
546                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
547                                                 newKey = null;
548                                         }
549
550                                         // Remove the aborts and commit parts that were sent from the pending to send queue
551                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
552                                                 ArbitrationRound round = iter.next();
553                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
554
555                                                 if (round.isDoneSending()) {
556                                                         // Sent all the parts
557                                                         iter.remove();
558                                                 }
559                                         }
560                                         
561                                         for (Transaction transaction : transactionPartsSent.keySet()) {
562
563
564                                                 transaction.resetServerFailure();
565
566
567                                                 // Update which transactions parts still need to be sent
568                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
569
570                                                 // Add the transaction status to the outstanding list
571                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
572
573                                                 // Update the transaction status
574                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
575
576                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
577                                                 if (transaction.didSendAllParts()) {
578                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
579                                                         pendingTransactionQueue.remove(transaction);
580                                                 }
581                                         }
582                                 } else {
583                                         // Reset which transaction to send
584                                         for (Transaction transaction : transactionPartsSent.keySet()) {
585                                                 transaction.resetNextPartToSend();
586                                                 transaction.resetNextPartToSend();
587
588                                                 // Set the transaction sequence number back to nothing
589                                                 if (!transaction.didSendAPartToServer()) {
590                                                         transaction.setSequenceNumber(-1);
591                                                 }
592                                         }
593                                 }
594
595                                 // Clear the sent data in preparation for next send
596                                 pendingSendArbitrationEntriesToDelete.clear();
597                                 transactionPartsSent.clear();
598
599                                 if (sendSlotsReturn.getThird().length != 0) {
600                                         // insert into the local block chain
601                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
602                                 }
603                         }
604                 } catch (ServerException e) {
605
606                         System.out.println("Server Failure:   " + e.getType());
607
608
609                         if (e.getType() != ServerException.TypeInputTimeout) {
610                                 // e.printStackTrace();
611
612                                 // Nothing was able to be sent to the server so just clear these data structures
613                                 for (Transaction transaction : transactionPartsSent.keySet()) {
614                                         transaction.resetNextPartToSend();
615
616                                         // Set the transaction sequence number back to nothing
617                                         if (!transaction.didSendAPartToServer()) {
618                                                 transaction.setSequenceNumber(-1);
619                                         }
620                                 }
621                         } else {
622                                 // There was a partial send to the server
623                                 hadPartialSendToServer = true;
624
625                                 // Nothing was able to be sent to the server so just clear these data structures
626                                 for (Transaction transaction : transactionPartsSent.keySet()) {
627                                         transaction.resetNextPartToSend();
628                                         transaction.setServerFailure();
629                                 }
630                         }
631
632                         pendingSendArbitrationEntriesToDelete.clear();
633                         transactionPartsSent.clear();
634
635                         throw e;
636                 }
637
638                 return newKey == null;
639         }
640
641         public synchronized boolean updateFromLocal(long machineId) {
642                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
643                 if (localCommunicationInformation == null) {
644                         // Cant talk to that device locally so do nothing
645                         return false;
646                 }
647
648                 // Get the size of the send data
649                 int sendDataSize = Integer.BYTES + Long.BYTES;
650
651                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
652                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
653                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
654                 }
655
656                 byte[] sendData = new byte[sendDataSize];
657                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
658
659                 // Encode the data
660                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
661                 bbEncode.putInt(0);
662
663                 // Send by local
664                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
665
666                 if (returnData == null) {
667                         // Could not contact server
668                         return false;
669                 }
670
671                 // Decode the data
672                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
673                 int numberOfEntries = bbDecode.getInt();
674
675                 for (int i = 0; i < numberOfEntries; i++) {
676                         byte type = bbDecode.get();
677                         if (type == Entry.TypeAbort) {
678                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
679                                 processEntry(abort);
680                         } else if (type == Entry.TypeCommitPart) {
681                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
682                                 processEntry(commitPart);
683                         }
684                 }
685
686                 updateLiveStateFromLocal();
687
688                 return true;
689         }
690
691         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
692
693                 // Get the devices local communications
694                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
695
696                 if (localCommunicationInformation == null) {
697                         // Cant talk to that device locally so do nothing
698                         return new Pair<Boolean, Boolean>(false, false);
699                 }
700
701                 // Get the size of the send data
702                 int sendDataSize = Integer.BYTES + Long.BYTES;
703                 for (TransactionPart part : transaction.getParts().values()) {
704                         sendDataSize += part.getSize();
705                 }
706
707                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
708                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
709                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
710                 }
711
712                 // Make the send data size
713                 byte[] sendData = new byte[sendDataSize];
714                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
715
716                 // Encode the data
717                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
718                 bbEncode.putInt(transaction.getParts().size());
719                 for (TransactionPart part : transaction.getParts().values()) {
720                         part.encode(bbEncode);
721                 }
722
723                 // Send by local
724                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
725
726                 if (returnData == null) {
727                         // Could not contact server
728                         return new Pair<Boolean, Boolean>(true, false);
729                 }
730
731                 // Decode the data
732                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
733                 boolean didCommit = bbDecode.get() == 1;
734                 boolean couldArbitrate = bbDecode.get() == 1;
735                 int numberOfEntries = bbDecode.getInt();
736                 boolean foundAbort = false;
737
738                 for (int i = 0; i < numberOfEntries; i++) {
739                         byte type = bbDecode.get();
740                         if (type == Entry.TypeAbort) {
741                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
742
743                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
744                                         foundAbort = true;
745                                 }
746
747                                 processEntry(abort);
748                         } else if (type == Entry.TypeCommitPart) {
749                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
750                                 processEntry(commitPart);
751                         }
752                 }
753
754                 updateLiveStateFromLocal();
755
756                 if (couldArbitrate) {
757                         TransactionStatus status =  transaction.getTransactionStatus();
758                         if (didCommit) {
759                                 status.setStatus(TransactionStatus.StatusCommitted);
760                         } else {
761                                 status.setStatus(TransactionStatus.StatusAborted);
762                         }
763                 } else {
764                         TransactionStatus status =  transaction.getTransactionStatus();
765                         if (foundAbort) {
766                                 status.setStatus(TransactionStatus.StatusAborted);
767                         } else {
768                                 status.setStatus(TransactionStatus.StatusCommitted);
769                         }
770                 }
771
772                 return new Pair<Boolean, Boolean>(false, true);
773         }
774
775         public synchronized byte[] acceptDataFromLocal(byte[] data) {
776                 // Decode the data
777                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
778                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
779                 int numberOfParts = bbDecode.getInt();
780
781                 // If we did commit a transaction or not
782                 boolean didCommit = false;
783                 boolean couldArbitrate = false;
784
785                 if (numberOfParts != 0) {
786
787                         // decode the transaction
788                         Transaction transaction = new Transaction();
789                         for (int i = 0; i < numberOfParts; i++) {
790                                 bbDecode.get();
791                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
792                                 transaction.addPartDecode(newPart);
793                         }
794
795                         // Arbitrate on transaction and pull relevant return data
796                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
797                         couldArbitrate = localArbitrateReturn.getFirst();
798                         didCommit = localArbitrateReturn.getSecond();
799
800                         updateLiveStateFromLocal();
801
802                         // Transaction was sent to the server so keep track of it to prevent double commit
803                         if (transaction.getSequenceNumber() != -1) {
804                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
805                         }
806                 }
807
808                 // The data to send back
809                 int returnDataSize = 0;
810                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
811
812                 // Get the aborts to send back
813                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
814                 Collections.sort(abortLocalSequenceNumbers);
815                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
816                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
817                                 continue;
818                         }
819
820                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
821                         unseenArbitrations.add(abort);
822                         returnDataSize += abort.getSize();
823                 }
824
825                 // Get the commits to send back
826                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
827                 if (commitForClientTable != null) {
828                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
829                         Collections.sort(commitLocalSequenceNumbers);
830
831                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
832                                 Commit commit = commitForClientTable.get(localSequenceNumber);
833
834                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
835                                         continue;
836                                 }
837
838                                 unseenArbitrations.addAll(commit.getParts().values());
839
840                                 for (CommitPart commitPart : commit.getParts().values()) {
841                                         returnDataSize += commitPart.getSize();
842                                 }
843                         }
844                 }
845
846                 // Number of arbitration entries to decode
847                 returnDataSize += 2 * Integer.BYTES;
848
849                 // Boolean of did commit or not
850                 if (numberOfParts != 0) {
851                         returnDataSize += Byte.BYTES;
852                 }
853
854                 // Data to send Back
855                 byte[] returnData = new byte[returnDataSize];
856                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
857
858                 if (numberOfParts != 0) {
859                         if (didCommit) {
860                                 bbEncode.put((byte)1);
861                         } else {
862                                 bbEncode.put((byte)0);
863                         }
864                         if (couldArbitrate) {
865                                 bbEncode.put((byte)1);
866                         } else {
867                                 bbEncode.put((byte)0);
868                         }
869                 }
870
871                 bbEncode.putInt(unseenArbitrations.size());
872                 for (Entry entry : unseenArbitrations) {
873                         entry.encode(bbEncode);
874                 }
875
876                 return returnData;
877         }
878
879         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
880
881                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
882                 attemptedToSendToServer = true;
883
884                 boolean inserted = false;
885                 boolean lastTryInserted = false;
886
887                 Slot[] array = cloud.putSlot(slot, newSize);
888                 if (array == null) {
889                         array = new Slot[] {slot};
890                         rejectedSlotList.clear();
891                         inserted = true;
892                 }       else {
893                         if (array.length == 0) {
894                                 throw new Error("Server Error: Did not send any slots");
895                         }
896
897                         // if (attemptedToSendToServerTmp) {
898                         if (hadPartialSendToServer) {
899
900                                 boolean isInserted = false;
901                                 for (Slot s : array) {
902                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
903                                                 isInserted = true;
904                                                 break;
905                                         }
906                                 }
907
908                                 for (Slot s : array) {
909                                         if (isInserted) {
910                                                 break;
911                                         }
912
913                                         // Process each entry in the slot
914                                         for (Entry entry : s.getEntries()) {
915
916                                                 if (entry.getType() == Entry.TypeLastMessage) {
917                                                         LastMessage lastMessage = (LastMessage)entry;
918
919                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
920                                                                 isInserted = true;
921                                                                 break;
922                                                         }
923                                                 }
924                                         }
925                                 }
926
927                                 if (!isInserted) {
928                                         rejectedSlotList.add(slot.getSequenceNumber());
929                                         lastTryInserted = false;
930                                 } else {
931                                         lastTryInserted = true;
932                                 }
933                         } else {
934                                 rejectedSlotList.add(slot.getSequenceNumber());
935                                 lastTryInserted = false;
936                         }
937                 }
938
939                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
940         }
941
942         /**
943          * Returns false if a resize was needed
944          */
945         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
946                 int newSize = 0;
947                 if (liveSlotCount > bufferResizeThreshold) {
948                         resize = true; //Resize is forced
949                 }
950
951                 if (resize) {
952                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
953                         TableStatus status = new TableStatus(slot, newSize);
954                         slot.addEntry(status);
955                 }
956
957                 // Fill with rejected slots first before doing anything else
958                 doRejectedMessages(slot);
959
960                 // Do mandatory rescue of entries
961                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
962
963                 // Extract working variables
964                 boolean needsResize = mandatoryRescueReturn.getFirst();
965                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
966                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
967
968                 if (needsResize && !resize) {
969                         // We need to resize but we are not resizing so return false
970                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
971                 }
972
973                 boolean inserted = false;
974                 if (newKeyEntry != null) {
975                         newKeyEntry.setSlot(slot);
976                         if (slot.hasSpace(newKeyEntry)) {
977                                 slot.addEntry(newKeyEntry);
978                                 inserted = true;
979                         }
980                 }
981
982                 // Clear the transactions, aborts and commits that were sent previously
983                 transactionPartsSent.clear();
984                 pendingSendArbitrationEntriesToDelete.clear();
985
986                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
987                         boolean isFull = false;
988                         round.generateParts();
989                         List<Entry> parts = round.getParts();
990
991                         // Insert pending arbitration data
992                         for (Entry arbitrationData : parts) {
993
994                                 // If it is an abort then we need to set some information
995                                 if (arbitrationData instanceof Abort) {
996                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
997                                 }
998
999                                 if (!slot.hasSpace(arbitrationData)) {
1000                                         // No space so cant do anything else with these data entries
1001                                         isFull = true;
1002                                         break;
1003                                 }
1004
1005                                 // Add to this current slot and add it to entries to delete
1006                                 slot.addEntry(arbitrationData);
1007                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1008                         }
1009
1010                         if (isFull) {
1011                                 break;
1012                         }
1013                 }
1014
1015                 if (pendingTransactionQueue.size() > 0) {
1016
1017                         Transaction transaction = pendingTransactionQueue.get(0);
1018
1019                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1020                         if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1021                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1022                         }
1023
1024                         while (true) {
1025                                 TransactionPart part = transaction.getNextPartToSend();
1026
1027                                 if (part == null) {
1028                                         // Ran out of parts to send for this transaction so move on
1029                                         break;
1030                                 }
1031
1032                                 if (slot.hasSpace(part)) {
1033                                         slot.addEntry(part);
1034                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1035                                         if (partsSent == null) {
1036                                                 partsSent = new ArrayList<Integer>();
1037                                                 transactionPartsSent.put(transaction, partsSent);
1038                                         }
1039                                         partsSent.add(part.getPartNumber());
1040                                         transactionPartsSent.put(transaction, partsSent);
1041                                 } else {
1042                                         break;
1043                                 }
1044                         }
1045                 }
1046
1047
1048                 // // Insert as many transactions as possible while keeping order
1049                 // for (Transaction transaction : pendingTransactionQueue) {
1050
1051                 //      // Set the transaction sequence number if it has yet to be inserted into the block chain
1052                 //      if (!transaction.didSendAPartToServer()) {
1053                 //              transaction.setSequenceNumber(slot.getSequenceNumber());
1054                 //      }
1055
1056                 //      boolean ranOutOfSpace = false;
1057
1058                 //      while (true) {
1059                 //              TransactionPart part = transaction.getNextPartToSend();
1060
1061                 //              if (part == null) {
1062                 //                      // Ran out of parts to send for this transaction so move on
1063                 //                      break;
1064                 //              }
1065
1066                 //              if (slot.hasSpace(part)) {
1067                 //                      slot.addEntry(part);
1068                 //                      List<Integer> partsSent = transactionPartsSent.get(transaction);
1069                 //                      if (partsSent == null) {
1070                 //                              partsSent = new ArrayList<Integer>();
1071                 //                              transactionPartsSent.put(transaction, partsSent);
1072                 //                      }
1073                 //                      partsSent.add(part.getPartNumber());
1074                 //                      transactionPartsSent.put(transaction, partsSent);
1075
1076                 //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1077                 //                              System.out.println("Inserted Into Slot: " + kv);
1078                 //                      }
1079
1080                 //                      ranOutOfSpace = true;
1081                 //                      break;
1082
1083                 //              } else {
1084                 //                      ranOutOfSpace = true;
1085                 //                      break;
1086                 //              }
1087                 //      }
1088
1089                 //      if (ranOutOfSpace) {
1090                 //              break;
1091                 //      }
1092                 // }
1093
1094                 // Fill the remainder of the slot with rescue data
1095                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1096
1097                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1098         }
1099
1100         private void doRejectedMessages(Slot s) {
1101                 if (! rejectedSlotList.isEmpty()) {
1102                         /* TODO: We should avoid generating a rejected message entry if
1103                          * there is already a sufficient entry in the queue (e.g.,
1104                          * equalsto value of true and same sequence number).  */
1105
1106                         long old_seqn = rejectedSlotList.firstElement();
1107                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1108                                 long new_seqn = rejectedSlotList.lastElement();
1109                                 RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
1110                                 s.addEntry(rm);
1111                         } else {
1112                                 long prev_seqn = -1;
1113                                 int i = 0;
1114                                 /* Go through list of missing messages */
1115                                 for (; i < rejectedSlotList.size(); i++) {
1116                                         long curr_seqn = rejectedSlotList.get(i);
1117                                         Slot s_msg = buffer.getSlot(curr_seqn);
1118                                         if (s_msg != null)
1119                                                 break;
1120                                         prev_seqn = curr_seqn;
1121                                 }
1122                                 /* Generate rejected message entry for missing messages */
1123                                 if (prev_seqn != -1) {
1124                                         RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
1125                                         s.addEntry(rm);
1126                                 }
1127                                 /* Generate rejected message entries for present messages */
1128                                 for (; i < rejectedSlotList.size(); i++) {
1129                                         long curr_seqn = rejectedSlotList.get(i);
1130                                         Slot s_msg = buffer.getSlot(curr_seqn);
1131                                         long machineid = s_msg.getMachineID();
1132                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
1133                                         s.addEntry(rm);
1134                                 }
1135                         }
1136                 }
1137         }
1138
1139         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1140                 long newestSequenceNumber = buffer.getNewestSeqNum();
1141                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1142                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1143                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1144                 }
1145
1146                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1147                 boolean seenLiveSlot = false;
1148                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1149                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1150
1151
1152                 // Mandatory Rescue
1153                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1154                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1155                         // Push slot number forward
1156                         if (! seenLiveSlot) {
1157                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1158                         }
1159
1160                         if (!previousSlot.isLive()) {
1161                                 continue;
1162                         }
1163
1164                         // We have seen a live slot
1165                         seenLiveSlot = true;
1166
1167                         // Get all the live entries for a slot
1168                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1169
1170                         // Iterate over all the live entries and try to rescue them
1171                         for (Entry liveEntry : liveEntries) {
1172                                 if (slot.hasSpace(liveEntry)) {
1173
1174                                         // Enough space to rescue the entry
1175                                         slot.addEntry(liveEntry);
1176                                 } else if (currentSequenceNumber == firstIfFull) {
1177                                         //if there's no space but the entry is about to fall off the queue
1178                                         System.out.println("B"); //?
1179                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1180
1181                                 }
1182                         }
1183                 }
1184
1185                 // Did not resize
1186                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1187         }
1188
1189         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1190                 /* now go through live entries from least to greatest sequence number until
1191                  * either all live slots added, or the slot doesn't have enough room
1192                  * for SKIP_THRESHOLD consecutive entries*/
1193                 int skipcount = 0;
1194                 long newestseqnum = buffer.getNewestSeqNum();
1195                 search:
1196                 for (; seqn <= newestseqnum; seqn++) {
1197                         Slot prevslot = buffer.getSlot(seqn);
1198                         //Push slot number forward
1199                         if (!seenliveslot)
1200                                 oldestLiveSlotSequenceNumver = seqn;
1201
1202                         if (!prevslot.isLive())
1203                                 continue;
1204                         seenliveslot = true;
1205                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1206                         for (Entry liveentry : liveentries) {
1207                                 if (s.hasSpace(liveentry))
1208                                         s.addEntry(liveentry);
1209                                 else {
1210                                         skipcount++;
1211                                         if (skipcount > SKIP_THRESHOLD)
1212                                                 break search;
1213                                 }
1214                         }
1215                 }
1216         }
1217
1218         /**
1219          * Checks for malicious activity and updates the local copy of the block chain.
1220          */
1221         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1222
1223                 // The cloud communication layer has checked slot HMACs already before decoding
1224                 if (newSlots.length == 0) {
1225                         return;
1226                 }
1227
1228                 // Reset the table status declared sizes
1229                 smallestTableStatusSeen = -1;
1230                 largestTableStatusSeen = -1;
1231
1232
1233                 // Make sure all slots are newer than the last largest slot this client has seen
1234                 long firstSeqNum = newSlots[0].getSequenceNumber();
1235                 if (firstSeqNum <= sequenceNumber) {
1236                         throw new Error("Server Error: Sent older slots!");
1237                 }
1238
1239                 // Create an object that can access both new slots and slots in our local chain
1240                 // without committing slots to our local chain
1241                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1242
1243                 // Check that the HMAC chain is not broken
1244                 checkHMACChain(indexer, newSlots);
1245
1246                 // Set to keep track of messages from clients
1247                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1248
1249                 // Process each slots data
1250                 for (Slot slot : newSlots) {
1251                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1252                 }
1253
1254                 // If there is a gap, check to see if the server sent us everything.
1255                 if (firstSeqNum != (sequenceNumber + 1)) {
1256
1257                         // Check the size of the slots that were sent down by the server.
1258                         // Can only check the size if there was a gap
1259                         checkNumSlots(newSlots.length);
1260
1261                         // Since there was a gap every machine must have pushed a slot or must have
1262                         // a last message message.  If not then the server is hiding slots
1263                         if (!machineSet.isEmpty()) {
1264                                 throw new Error("Missing record for machines: " + machineSet);
1265                         }
1266                 }
1267
1268                 // Update the size of our local block chain.
1269                 commitNewMaxSize();
1270
1271                 // Commit new to slots to the local block chain.
1272                 for (Slot slot : newSlots) {
1273
1274                         // Insert this slot into our local block chain copy.
1275                         buffer.putSlot(slot);
1276
1277                         // Keep track of how many slots are currently live (have live data in them).
1278                         liveSlotCount++;
1279                 }
1280
1281                 // Get the sequence number of the latest slot in the system
1282                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1283
1284                 updateLiveStateFromServer();
1285
1286                 // No Need to remember after we pulled from the server
1287                 offlineTransactionsCommittedAndAtServer.clear();
1288
1289                 // This is invalidated now
1290                 hadPartialSendToServer = false;
1291         }
1292
1293         private void updateLiveStateFromServer() {
1294                 // Process the new transaction parts
1295                 processNewTransactionParts();
1296
1297                 // Do arbitration on new transactions that were received
1298                 arbitrateFromServer();
1299
1300                 // Update all the committed keys
1301                 boolean didCommitOrSpeculate = updateCommittedTable();
1302
1303                 // Delete the transactions that are now dead
1304                 updateLiveTransactionsAndStatus();
1305
1306                 // Do speculations
1307                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1308                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1309         }
1310
1311         private void updateLiveStateFromLocal() {
1312                 // Update all the committed keys
1313                 boolean didCommitOrSpeculate = updateCommittedTable();
1314
1315                 // Delete the transactions that are now dead
1316                 updateLiveTransactionsAndStatus();
1317
1318                 // Do speculations
1319                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1320                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1321         }
1322
1323         /**
1324          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1325          * This is only called when we have a gap between the slots that we have locally and the slots
1326          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1327          * status message
1328          */
1329         private void checkNumSlots(int numberOfSlots) {
1330
1331                 // We only have 1 size so we must have this many slots
1332                 if (largestTableStatusSeen == smallestTableStatusSeen) {
1333                         if (numberOfSlots != smallestTableStatusSeen) {
1334                                 throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
1335                         }
1336                 } else {
1337                         // We have more than 1
1338                         if (numberOfSlots < smallestTableStatusSeen) {
1339                                 throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
1340                         }
1341                 }
1342         }
1343
1344         /**
1345          * Update the size of of the local buffer if it is needed.
1346          */
1347         private void commitNewMaxSize() {
1348
1349                 int currMaxSize = 0;
1350
1351                 if (largestTableStatusSeen == -1) {
1352                         // No table status seen so the current max size does not change
1353                         currMaxSize = numberOfSlots;
1354                 } else {
1355                         currMaxSize = largestTableStatusSeen;
1356                 }
1357
1358                 // Resize the local slot buffer
1359                 if (numberOfSlots != currMaxSize) {
1360                         buffer.resize(currMaxSize);
1361                 }
1362
1363                 // Change the number of local slots to the new size
1364                 numberOfSlots = currMaxSize;
1365
1366                 // Recalculate the resize threshold since the size of the local buffer has changed
1367                 setResizeThreshold();
1368         }
1369
1370         /**
1371          * Process the new transaction parts from this latest round of slots received from the server
1372          */
1373         private void processNewTransactionParts() {
1374
1375                 if (newTransactionParts.size() == 0) {
1376                         // Nothing new to process
1377                         return;
1378                 }
1379
1380                 // Iterate through all the machine Ids that we received new parts for
1381                 for (Long machineId : newTransactionParts.keySet()) {
1382                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1383
1384                         // Iterate through all the parts for that machine Id
1385                         for (Pair<Long, Integer> partId : parts.keySet()) {
1386                                 TransactionPart part = parts.get(partId);
1387
1388                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1389                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1390                                         // Set dead the transaction part
1391                                         part.setDead();
1392                                         continue;
1393                                 }
1394
1395                                 // Get the transaction object for that sequence number
1396                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1397
1398                                 if (transaction == null) {
1399                                         // This is a new transaction that we dont have so make a new one
1400                                         transaction = new Transaction();
1401
1402                                         // Insert this new transaction into the live tables
1403                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1404                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1405                                 }
1406
1407                                 // Add that part to the transaction
1408                                 transaction.addPartDecode(part);
1409
1410                                 if (transaction.isComplete()) {
1411                                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1412                                                 System.out.println("Got Live Transaction   " + kv + "    " + part.getSequenceNumber());
1413                                         }
1414                                 }
1415                         }
1416                 }
1417
1418                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1419                 newTransactionParts.clear();
1420         }
1421
1422         private void arbitrateFromServer() {
1423
1424                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1425                         // Nothing to arbitrate on so move on
1426                         return;
1427                 }
1428
1429                 // Get the transaction sequence numbers and sort from oldest to newest
1430                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1431                 Collections.sort(transactionSequenceNumbers);
1432
1433                 // Collection of key value pairs that are
1434                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1435
1436                 // The last transaction arbitrated on
1437                 long lastTransactionCommitted = -1;
1438                 Set<Abort> generatedAborts = new HashSet<Abort>();
1439
1440                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1441                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1442
1443                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1444                         if (transaction.getArbitrator() != localMachineId) {
1445                                 continue;
1446                         }
1447
1448                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1449                                 // We have seen this already locally so dont commit again
1450                                 continue;
1451                         }
1452
1453
1454                         if (!transaction.isComplete()) {
1455                                 // Will arbitrate in incorrect order if we continue so just break
1456                                 // Most likely this
1457                                 break;
1458                         }
1459
1460                         // update the largest transaction seen by arbitrator from server
1461                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1462                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1463                         } else {
1464                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1465                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1466                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1467                                 }
1468                         }
1469
1470
1471                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1472                                 System.out.println("Arbitrating on:     " + kv);
1473                         }
1474
1475                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1476                                 // Guard evaluated as true
1477
1478                                 // Update the local changes so we can make the commit
1479                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1480                                         speculativeTableTmp.put(kv.getKey(), kv);
1481                                 }
1482
1483                                 // Update what the last transaction committed was for use in batch commit
1484                                 lastTransactionCommitted = transaction.getSequenceNumber();
1485                         } else {
1486                                 // Guard evaluated was false so create abort
1487
1488                                 // Create the abort
1489                                 Abort newAbort = new Abort(null,
1490                                                            transaction.getClientLocalSequenceNumber(),
1491                                                            transaction.getSequenceNumber(),
1492                                                            transaction.getMachineId(),
1493                                                            transaction.getArbitrator(),
1494                                                            localArbitrationSequenceNumber);
1495                                 localArbitrationSequenceNumber++;
1496
1497                                 generatedAborts.add(newAbort);
1498
1499                                 // Insert the abort so we can process
1500                                 processEntry(newAbort);
1501
1502                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1503                                         System.out.println("Abort From Server!!!!!!     " + kv);
1504                                 }
1505                         }
1506                 }
1507
1508                 Commit newCommit = null;
1509
1510                 // If there is something to commit
1511                 if (speculativeTableTmp.size() != 0) {
1512
1513                         // Create the commit and increment the commit sequence number
1514                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1515                         localArbitrationSequenceNumber++;
1516
1517                         // Add all the new keys to the commit
1518                         for (KeyValue kv : speculativeTableTmp.values()) {
1519                                 newCommit.addKV(kv);
1520                         }
1521
1522                         // create the commit parts
1523                         newCommit.createCommitParts();
1524
1525                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1526
1527                         // Insert the commit so we can process it
1528                         for (CommitPart commitPart : newCommit.getParts().values()) {
1529                                 processEntry(commitPart);
1530                         }
1531                 }
1532
1533                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1534                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1535                         pendingSendArbitrationRounds.add(arbitrationRound);
1536
1537                         if (compactArbitrationData()) {
1538                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1539                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1540                                         processEntry(commitPart);
1541                                 }
1542                         }
1543                 }
1544         }
1545
1546         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1547
1548                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1549                 if (transaction.getArbitrator() != localMachineId) {
1550                         return new Pair<Boolean, Boolean>(false, false);
1551                 }
1552
1553                 if (!transaction.isComplete()) {
1554                         // Will arbitrate in incorrect order if we continue so just break
1555                         // Most likely this
1556                         return new Pair<Boolean, Boolean>(false, false);
1557                 }
1558
1559                 if (transaction.getMachineId() != localMachineId) {
1560                         // dont do this check for local transactions
1561                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1562                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1563                                         // We've have already seen this from the server
1564                                         return new Pair<Boolean, Boolean>(false, false);
1565                                 }
1566                         }
1567                 }
1568
1569                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1570                         // Guard evaluated as true
1571
1572                         // Create the commit and increment the commit sequence number
1573                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1574                         localArbitrationSequenceNumber++;
1575
1576                         // Update the local changes so we can make the commit
1577                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1578                                 newCommit.addKV(kv);
1579                         }
1580
1581                         // create the commit parts
1582                         newCommit.createCommitParts();
1583
1584                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1585                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1586                         pendingSendArbitrationRounds.add(arbitrationRound);
1587
1588                         if (compactArbitrationData()) {
1589                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1590                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1591                                         processEntry(commitPart);
1592                                 }
1593                         } else {
1594                                 // Insert the commit so we can process it
1595                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1596                                         processEntry(commitPart);
1597                                 }
1598                         }
1599
1600                         if (transaction.getMachineId() == localMachineId) {
1601                                 TransactionStatus status = transaction.getTransactionStatus();
1602                                 if (status != null) {
1603                                         status.setStatus(TransactionStatus.StatusCommitted);
1604                                 }
1605                         }
1606
1607                         updateLiveStateFromLocal();
1608                         return new Pair<Boolean, Boolean>(true, true);
1609                 } else {
1610
1611                         if (transaction.getMachineId() == localMachineId) {
1612                                 // For locally created messages update the status
1613
1614                                 // Guard evaluated was false so create abort
1615                                 TransactionStatus status = transaction.getTransactionStatus();
1616                                 if (status != null) {
1617                                         status.setStatus(TransactionStatus.StatusAborted);
1618                                 }
1619                         } else {
1620
1621                                 Set addAbortSet = new HashSet<Abort>();
1622
1623
1624                                 // Create the abort
1625                                 Abort newAbort = new Abort(null,
1626                                                            transaction.getClientLocalSequenceNumber(),
1627                                                            -1,
1628                                                            transaction.getMachineId(),
1629                                                            transaction.getArbitrator(),
1630                                                            localArbitrationSequenceNumber);
1631                                 localArbitrationSequenceNumber++;
1632
1633                                 addAbortSet.add(newAbort);
1634
1635
1636                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1637                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1638                                 pendingSendArbitrationRounds.add(arbitrationRound);
1639
1640                                 if (compactArbitrationData()) {
1641                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1642                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1643                                                 processEntry(commitPart);
1644                                         }
1645                                 }
1646                         }
1647
1648                         updateLiveStateFromLocal();
1649                         return new Pair<Boolean, Boolean>(true, false);
1650                 }
1651         }
1652
1653         /**
1654          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1655          */
1656         private boolean compactArbitrationData() {
1657
1658                 if (pendingSendArbitrationRounds.size() < 2) {
1659                         // Nothing to compact so do nothing
1660                         return false;
1661                 }
1662
1663                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1664                 if (lastRound.didSendPart()) {
1665                         return false;
1666                 }
1667
1668                 boolean hadCommit = (lastRound.getCommit() == null);
1669                 boolean gotNewCommit = false;
1670
1671                 int numberToDelete = 1;
1672                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1673                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1674
1675                         if (round.isFull() || round.didSendPart()) {
1676                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1677                                 break;
1678                         }
1679
1680                         if (round.getCommit() == null) {
1681
1682                                 // Try compacting aborts only
1683                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1684                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1685                                         // Cant compact since it would be too large
1686                                         break;
1687                                 }
1688                                 lastRound.addAborts(round.getAborts());
1689                         } else {
1690
1691                                 // Create a new larger commit
1692                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1693                                 localArbitrationSequenceNumber++;
1694
1695                                 // Create the commit parts so that we can count them
1696                                 newCommit.createCommitParts();
1697
1698                                 // Calculate the new size of the parts
1699                                 int newSize = newCommit.getNumberOfParts();
1700                                 newSize += lastRound.getAbortsCount();
1701                                 newSize += round.getAbortsCount();
1702
1703                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1704                                         // Cant compact since it would be too large
1705                                         break;
1706                                 }
1707
1708                                 // Set the new compacted part
1709                                 lastRound.setCommit(newCommit);
1710                                 lastRound.addAborts(round.getAborts());
1711                                 gotNewCommit = true;
1712                         }
1713
1714                         numberToDelete++;
1715                 }
1716
1717                 if (numberToDelete != 1) {
1718                         // If there is a compaction
1719
1720                         // Delete the previous pieces that are now in the new compacted piece
1721                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1722                                 pendingSendArbitrationRounds.clear();
1723                         } else {
1724                                 for (int i = 0; i < numberToDelete; i++) {
1725                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1726                                 }
1727                         }
1728
1729                         // Add the new compacted into the pending to send list
1730                         pendingSendArbitrationRounds.add(lastRound);
1731
1732                         // Should reinsert into the commit processor
1733                         if (hadCommit && gotNewCommit) {
1734                                 return true;
1735                         }
1736                 }
1737
1738                 return false;
1739         }
1740         // private boolean compactArbitrationData() {
1741         //      return false;
1742         // }
1743
1744         /**
1745          * Update all the commits and the committed tables, sets dead the dead transactions
1746          */
1747         private boolean updateCommittedTable() {
1748
1749                 if (newCommitParts.size() == 0) {
1750                         // Nothing new to process
1751                         return false;
1752                 }
1753
1754                 // Iterate through all the machine Ids that we received new parts for
1755                 for (Long machineId : newCommitParts.keySet()) {
1756                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1757
1758                         // Iterate through all the parts for that machine Id
1759                         for (Pair<Long, Integer> partId : parts.keySet()) {
1760                                 CommitPart part = parts.get(partId);
1761
1762                                 // Get the transaction object for that sequence number
1763                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1764
1765                                 if (commitForClientTable == null) {
1766                                         // This is the first commit from this device
1767                                         commitForClientTable = new HashMap<Long, Commit>();
1768                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1769                                 }
1770
1771                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
1772
1773                                 if (commit == null) {
1774                                         // This is a new commit that we dont have so make a new one
1775                                         commit = new Commit();
1776
1777                                         // Insert this new commit into the live tables
1778                                         commitForClientTable.put(part.getSequenceNumber(), commit);
1779                                 }
1780
1781                                 // Add that part to the commit
1782                                 commit.addPartDecode(part);
1783                         }
1784                 }
1785
1786                 // Clear all the new commits parts in preparation for the next time the server sends slots
1787                 newCommitParts.clear();
1788
1789                 // If we process a new commit keep track of it for future use
1790                 boolean didProcessANewCommit = false;
1791
1792                 // Process the commits one by one
1793                 for (Long arbitratorId : liveCommitsTable.keySet()) {
1794
1795                         // Get all the commits for a specific arbitrator
1796                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
1797
1798                         // Sort the commits in order
1799                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1800                         Collections.sort(commitSequenceNumbers);
1801
1802                         // Go through each new commit one by one
1803                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
1804                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
1805                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
1806
1807                                 // Special processing if a commit is not complete
1808                                 if (!commit.isComplete()) {
1809                                         if (i == (commitSequenceNumbers.size() - 1)) {
1810                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1811                                                 break;
1812                                         } else {
1813                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
1814                                                 // Delete it and move on
1815                                                 commit.setDead();
1816                                                 commitForClientTable.remove(commit.getSequenceNumber());
1817                                                 continue;
1818                                         }
1819                                 }
1820
1821                                 // Get the last commit seen from this arbitrator
1822                                 long lastCommitSeenSequenceNumber = -1;
1823                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
1824                                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId());
1825                                 }
1826
1827                                 // Update the last arbitration data that we have seen so far
1828                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
1829
1830                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
1831                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
1832                                                 // Is larger
1833                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
1834                                         }
1835                                 } else {
1836                                         // Never seen any data from this arbitrator so record the first one
1837                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
1838                                 }
1839
1840                                 // We have already seen this commit before so need to do the full processing on this commit
1841                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1842
1843                                         // Update the last transaction that was updated if we can
1844                                         if (commit.getTransactionSequenceNumber() != -1) {
1845                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1846
1847                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1848                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1849                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1850                                                 }
1851                                         }
1852
1853                                         continue;
1854                                 }
1855
1856                                 // If we got here then this is a brand new commit and needs full processing
1857
1858                                 // Get what commits should be edited, these are the commits that have live values for their keys
1859                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
1860                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1861                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
1862                                 }
1863                                 commitsToEdit.remove(null); // remove null since it could be in this set
1864
1865                                 // Update each previous commit that needs to be updated
1866                                 for (Commit previousCommit : commitsToEdit) {
1867
1868                                         // Only bother with live commits (TODO: Maybe remove this check)
1869                                         if (previousCommit.isLive()) {
1870
1871                                                 // Update which keys in the old commits are still live
1872                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1873                                                         previousCommit.invalidateKey(kv.getKey());
1874                                                 }
1875
1876                                                 // if the commit is now dead then remove it
1877                                                 if (!previousCommit.isLive()) {
1878                                                         commitForClientTable.remove(previousCommit);
1879                                                 }
1880                                         }
1881                                 }
1882
1883                                 // Update the last seen sequence number from this arbitrator
1884                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
1885                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
1886                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
1887                                         }
1888                                 }
1889
1890
1891
1892                                 // Update the last transaction that was updated if we can
1893                                 if (commit.getTransactionSequenceNumber() != -1) {
1894                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1895
1896                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1897                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1898                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1899                                         }
1900                                 }
1901
1902                                 // We processed a new commit that we havent seen before
1903                                 didProcessANewCommit = true;
1904
1905
1906
1907                                 // Update the committed table of keys and which commit is using which key
1908                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1909                                         committedKeyValueTable.put(kv.getKey(), kv);
1910                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
1911                                 }
1912                         }
1913                 }
1914
1915                 return didProcessANewCommit;
1916         }
1917
1918         /**
1919          * Create the speculative table from transactions that are still live and have come from the cloud
1920          */
1921         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
1922                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
1923                         // There is nothing to speculate on
1924                         return false;
1925                 }
1926
1927                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
1928                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1929                 Collections.sort(transactionSequenceNumbersSorted);
1930
1931                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
1932
1933
1934                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
1935                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
1936                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
1937
1938                         // Start from scratch
1939                         speculatedKeyValueTable.clear();
1940                         lastTransactionSequenceNumberSpeculatedOn = -1;
1941                         oldestTransactionSequenceNumberSpeculatedOn = -1;
1942
1943                 }
1944
1945                 // Remember the front of the transaction list
1946                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
1947
1948                 // Find where to start arbitration from
1949                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
1950
1951                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
1952                         // Make sure we are not out of bounds
1953                         return false; // did not speculate
1954                 }
1955
1956                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
1957                 boolean didSkip = true;
1958
1959                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
1960                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
1961                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1962
1963                         if (!transaction.isComplete()) {
1964                                 // If there is an incomplete transaction then there is nothing we can do
1965                                 // add this transactions arbitrator to the list of arbitrators we should ignore
1966                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
1967                                 didSkip = true;
1968                                 continue;
1969                         }
1970
1971                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
1972                                 continue;
1973                         }
1974
1975                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
1976
1977                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
1978                                 // Guard evaluated to true so update the speculative table
1979                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1980                                         speculatedKeyValueTable.put(kv.getKey(), kv);
1981                                 }
1982                         }
1983                 }
1984
1985                 if (didSkip) {
1986                         // Since there was a skip we need to redo the speculation next time around
1987                         lastTransactionSequenceNumberSpeculatedOn = -1;
1988                         oldestTransactionSequenceNumberSpeculatedOn = -1;
1989                 }
1990
1991                 // We did some speculation
1992                 return true;
1993         }
1994
1995         /**
1996          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
1997          */
1998         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
1999                 if (pendingTransactionQueue.size() == 0) {
2000                         // There is nothing to speculate on
2001                         return;
2002                 }
2003
2004
2005                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2006                         // need to reset on the pending speculation
2007                         lastPendingTransactionSpeculatedOn = null;
2008                         firstPendingTransaction = pendingTransactionQueue.get(0);
2009                         pendingTransactionSpeculatedKeyValueTable.clear();
2010                 }
2011
2012                 // Find where to start arbitration from
2013                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2014
2015                 if (startIndex >= pendingTransactionQueue.size()) {
2016                         // Make sure we are not out of bounds
2017                         return;
2018                 }
2019
2020                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2021                         Transaction transaction = pendingTransactionQueue.get(i);
2022
2023                         lastPendingTransactionSpeculatedOn = transaction;
2024
2025                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2026                                 // Guard evaluated to true so update the speculative table
2027                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2028                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2029                                 }
2030                         }
2031                 }
2032         }
2033
2034         /**
2035          * Set dead and remove from the live transaction tables the transactions that are dead
2036          */
2037         private void updateLiveTransactionsAndStatus() {
2038
2039                 // Go through each of the transactions
2040                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2041                         Transaction transaction = iter.next().getValue();
2042
2043                         // Check if the transaction is dead
2044                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2045                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2046
2047                                 // Set dead the transaction
2048                                 transaction.setDead();
2049
2050                                 // Remove the transaction from the live table
2051                                 iter.remove();
2052                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2053                         }
2054                 }
2055
2056                 // Go through each of the transactions
2057                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2058                         TransactionStatus status = iter.next().getValue();
2059
2060                         // Check if the transaction is dead
2061                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2062                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2063
2064                                 // Set committed
2065                                 status.setStatus(TransactionStatus.StatusCommitted);
2066
2067                                 // Remove
2068                                 iter.remove();
2069                         }
2070                 }
2071         }
2072
2073         /**
2074          * Process this slot, entry by entry.  Also update the latest message sent by slot
2075          */
2076         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2077
2078                 // Update the last message seen
2079                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2080
2081                 // Process each entry in the slot
2082                 for (Entry entry : slot.getEntries()) {
2083                         switch (entry.getType()) {
2084
2085                         case Entry.TypeCommitPart:
2086                                 processEntry((CommitPart)entry);
2087                                 break;
2088
2089                         case Entry.TypeAbort:
2090                                 processEntry((Abort)entry);
2091                                 break;
2092
2093                         case Entry.TypeTransactionPart:
2094                                 processEntry((TransactionPart)entry);
2095                                 break;
2096
2097                         case Entry.TypeNewKey:
2098                                 processEntry((NewKey)entry);
2099                                 break;
2100
2101                         case Entry.TypeLastMessage:
2102                                 processEntry((LastMessage)entry, machineSet);
2103                                 break;
2104
2105                         case Entry.TypeRejectedMessage:
2106                                 processEntry((RejectedMessage)entry, indexer);
2107                                 break;
2108
2109                         case Entry.TypeTableStatus:
2110                                 processEntry((TableStatus)entry);
2111                                 break;
2112
2113                         default:
2114                                 throw new Error("Unrecognized type: " + entry.getType());
2115                         }
2116                 }
2117         }
2118
2119         /**
2120          * Update the last message that was sent for a machine Id
2121          */
2122         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2123                 // Update what the last message received by a machine was
2124                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2125         }
2126
2127         /**
2128          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2129          */
2130         private void processEntry(NewKey entry) {
2131
2132                 // Update the arbitrator table with the new key information
2133                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2134
2135                 // Update what the latest live new key is
2136                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2137                 if (oldNewKey != null) {
2138                         // Delete the old new key messages
2139                         oldNewKey.setDead();
2140                 }
2141         }
2142
2143         /**
2144          * Process new table status entries and set dead the old ones as new ones come in.
2145          * keeps track of the largest and smallest table status seen in this current round
2146          * of updating the local copy of the block chain
2147          */
2148         private void processEntry(TableStatus entry) {
2149                 int newNumSlots = entry.getMaxSlots();
2150
2151                 if (liveTableStatus != null) {
2152                         // We have a larger table status so the old table status is no longer alive
2153                         liveTableStatus.setDead();
2154                 }
2155
2156                 // Make this new table status the latest alive table status
2157                 liveTableStatus = entry;
2158
2159                 if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
2160                         smallestTableStatusSeen = newNumSlots;
2161                 }
2162
2163                 if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
2164                         largestTableStatusSeen = newNumSlots;
2165                 }
2166         }
2167
2168         /**
2169          * Check old messages to see if there is a block chain violation. Also
2170          */
2171         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2172                 long oldSeqNum = entry.getOldSeqNum();
2173                 long newSeqNum = entry.getNewSeqNum();
2174                 boolean isequal = entry.getEqual();
2175                 long machineId = entry.getMachineID();
2176
2177
2178                 // Check if we have messages that were supposed to be rejected in our local block chain
2179                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2180
2181                         // Get the slot
2182                         Slot slot = indexer.getSlot(seqNum);
2183
2184                         if (slot != null) {
2185                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2186
2187                                 long slotMachineId = slot.getMachineID();
2188                                 if (isequal != (slotMachineId == machineId)) {
2189                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2190                                 }
2191                         }
2192                 }
2193
2194
2195                 // Create a list of clients to watch until they see this rejected message entry.
2196                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2197                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2198
2199                         // Machine ID for the last message entry
2200                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2201
2202                         // We've seen it, don't need to continue to watch.  Our next
2203                         // message will implicitly acknowledge it.
2204                         if (lastMessageEntryMachineId == localMachineId) {
2205                                 continue;
2206                         }
2207
2208                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2209                         long entrySequenceNumber = lastMessageValue.getFirst();
2210
2211                         if (entrySequenceNumber < newSeqNum) {
2212
2213                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2214                                 addWatchList(lastMessageEntryMachineId, entry);
2215
2216                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2217                                 deviceWatchSet.add(lastMessageEntryMachineId);
2218                         }
2219                 }
2220
2221                 if (deviceWatchSet.isEmpty()) {
2222                         // This rejected message has been seen by all the clients so
2223                         entry.setDead();
2224                 } else {
2225                         // We need to watch this rejected message
2226                         entry.setWatchSet(deviceWatchSet);
2227                 }
2228         }
2229
2230         /**
2231          * Check if this abort is live, if not then save it so we can kill it later.
2232          * update the last transaction number that was arbitrated on.
2233          */
2234         private void processEntry(Abort entry) {
2235
2236
2237                 if (entry.getTransactionSequenceNumber() != -1) {
2238                         // update the transaction status if it was sent to the server
2239                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2240                         if (status != null) {
2241                                 status.setStatus(TransactionStatus.StatusAborted);
2242                         }
2243                 }
2244
2245                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2246                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2247                 if (previouslySeenAbort != null) {
2248                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2249                 }
2250
2251                 if (entry.getTransactionArbitrator() == localMachineId) {
2252                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2253                 }
2254
2255                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2256
2257                         // The machine already saw this so it is dead
2258                         entry.setDead();
2259                         liveAbortTable.remove(entry.getAbortId());
2260
2261                         if (entry.getTransactionArbitrator() == localMachineId) {
2262                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2263                         }
2264
2265                         return;
2266                 }
2267
2268
2269
2270
2271                 // Update the last arbitration data that we have seen so far
2272                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2273
2274                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2275                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2276                                 // Is larger
2277                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2278                         }
2279                 } else {
2280                         // Never seen any data from this arbitrator so record the first one
2281                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2282                 }
2283
2284
2285                 // Set dead a transaction if we can
2286                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2287                 if (transactionToSetDead != null) {
2288                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2289                 }
2290
2291                 // Update the last transaction sequence number that the arbitrator arbitrated on
2292                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2293                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2294
2295                         // Is a valid one
2296                         if (entry.getTransactionSequenceNumber() != -1) {
2297                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2298                         }
2299                 }
2300         }
2301
2302         /**
2303          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2304          */
2305         private void processEntry(TransactionPart entry) {
2306                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2307                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2308                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2309                         // This transaction is dead, it was already committed or aborted
2310                         entry.setDead();
2311                         return;
2312                 }
2313
2314                 // This part is still alive
2315                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2316
2317                 if (transactionPart == null) {
2318                         // Dont have a table for this machine Id yet so make one
2319                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2320                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2321                 }
2322
2323                 // Update the part and set dead ones we have already seen (got a rescued version)
2324                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2325                 if (previouslySeenPart != null) {
2326                         previouslySeenPart.setDead();
2327                 }
2328         }
2329
2330         /**
2331          * Process new commit entries and save them for future use.  Delete duplicates
2332          */
2333         private void processEntry(CommitPart entry) {
2334                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2335
2336                 if (commitPart == null) {
2337                         // Don't have a table for this machine Id yet so make one
2338                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2339                         newCommitParts.put(entry.getMachineId(), commitPart);
2340                 }
2341
2342                 // Update the part and set dead ones we have already seen (got a rescued version)
2343                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2344                 if (previouslySeenPart != null) {
2345                         previouslySeenPart.setDead();
2346                 }
2347         }
2348
2349         /**
2350          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2351          * Updates the live aborts, removes those that are dead and sets them dead.
2352          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2353          * other clients have not had a rollback on the last message.
2354          */
2355         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2356
2357                 // We have seen this machine ID
2358                 machineSet.remove(machineId);
2359
2360                 // Get the set of rejected messages that this machine Id is has not seen yet
2361                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2362
2363                 // If there is a rejected message that this machine Id has not seen yet
2364                 if (watchset != null) {
2365
2366                         // Go through each rejected message that this machine Id has not seen yet
2367                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2368
2369                                 RejectedMessage rm = rmit.next();
2370
2371                                 // If this machine Id has seen this rejected message...
2372                                 if (rm.getNewSeqNum() <= seqNum) {
2373
2374                                         // Remove it from our watchlist
2375                                         rmit.remove();
2376
2377                                         // Decrement machines that need to see this notification
2378                                         rm.removeWatcher(machineId);
2379                                 }
2380                         }
2381                 }
2382
2383                 // Set dead the abort
2384                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2385                         Abort abort = i.next().getValue();
2386
2387                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2388                                 abort.setDead();
2389                                 i.remove();
2390
2391                                 if (abort.getTransactionArbitrator() == localMachineId) {
2392                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2393                                 }
2394                         }
2395                 }
2396
2397
2398
2399                 if (machineId == localMachineId) {
2400                         // Our own messages are immediately dead.
2401                         if (liveness instanceof LastMessage) {
2402                                 ((LastMessage)liveness).setDead();
2403                         } else if (liveness instanceof Slot) {
2404                                 ((Slot)liveness).setDead();
2405                         } else {
2406                                 throw new Error("Unrecognized type");
2407                         }
2408                 }
2409
2410                 // Get the old last message for this device
2411                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2412                 if (lastMessageEntry == null) {
2413                         // If no last message then there is nothing else to process
2414                         return;
2415                 }
2416
2417                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2418                 Liveness lastEntry = lastMessageEntry.getSecond();
2419
2420                 // If it is not our machine Id since we already set ours to dead
2421                 if (machineId != localMachineId) {
2422                         if (lastEntry instanceof LastMessage) {
2423                                 ((LastMessage)lastEntry).setDead();
2424                         } else if (lastEntry instanceof Slot) {
2425                                 ((Slot)lastEntry).setDead();
2426                         } else {
2427                                 throw new Error("Unrecognized type");
2428                         }
2429                 }
2430
2431                 // Make sure the server is not playing any games
2432                 if (machineId == localMachineId) {
2433
2434                         if (hadPartialSendToServer) {
2435                                 // We were not making any updates and we had a machine mismatch
2436                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2437                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2438                                 }
2439
2440                         } else {
2441                                 // We were not making any updates and we had a machine mismatch
2442                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2443                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2444                                 }
2445                         }
2446                 } else {
2447                         if (lastMessageSeqNum > seqNum) {
2448                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2449                         }
2450                 }
2451         }
2452
2453         /**
2454          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2455          * rejected message entry and which have not.
2456          */
2457         private void addWatchList(long machineId, RejectedMessage entry) {
2458                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2459                 if (entries == null) {
2460                         // There is no set for this machine ID yet so create one
2461                         entries = new HashSet<RejectedMessage>();
2462                         rejectedMessageWatchListTable.put(machineId, entries);
2463                 }
2464                 entries.add(entry);
2465         }
2466
2467         /**
2468          * Check if the HMAC chain is not violated
2469          */
2470         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2471                 for (int i = 0; i < newSlots.length; i++) {
2472                         Slot currSlot = newSlots[i];
2473                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2474                         if (prevSlot != null &&
2475                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2476                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2477                 }
2478         }
2479 }