Fixed Rejected Messages, Calculating correct size
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.nio.ByteBuffer;
15
16 /**
17  * IoTTable data structure.  Provides client interface.
18  * @author Brian Demsky
19  * @version 1.0
20  */
21
22 final public class Table {
23         
24         /* Constants */
25         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31         /* Helper Objects */
32         private SlotBuffer buffer = null;
33         private CloudComm cloud = null;
34         private Random random = null;
35         private TableStatus liveTableStatus = null;
36         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
37         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
38         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
39
40         /* Variables */
41         private int numberOfSlots = 0;  // Number of slots stored in buffer
42         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
43         private long liveSlotCount = 0; // Number of currently live slots
44         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
45         private long localMachineId = 0; // Machine ID of this client device
46         private long sequenceNumber = 0; // Largest sequence number a client has received
47         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
48         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
49         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
50         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
51         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
52         private long localArbitrationSequenceNumber = 0;
53         private boolean hadPartialSendToServer = false;
54         private boolean attemptedToSendToServer = false;
55         private long expectedsize;
56         private boolean didFindTableStatus = false;
57         private long currMaxSize = 0;
58
59         /* Data Structures  */
60         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
61         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
62         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
63         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
64         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
65         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
66         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
67         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
68         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
69         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
70         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
71         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
72         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
73         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
74         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
75         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
76         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
77         private List<Transaction> pendingTransactionQueue = null;
78         private List<ArbitrationRound> pendingSendArbitrationRounds = null;
79         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
80         private Map<Transaction, List<Integer>> transactionPartsSent = null;
81         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
82         private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
83         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
84         private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
85         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
86         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
87
88
89         public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
90                 localMachineId = _localMachineId;
91                 cloud = new CloudComm(this, baseurl, password, listeningPort);
92
93                 init();
94         }
95
96         public Table(CloudComm _cloud, long _localMachineId) {
97                 localMachineId = _localMachineId;
98                 cloud = _cloud;
99
100                 init();
101         }
102
103         /**
104          * Init all the stuff needed for for table usage
105          */
106         private void init() {
107
108                 // Init helper objects
109                 random = new Random();
110                 buffer = new SlotBuffer();
111
112                 // Set Variables
113                 oldestLiveSlotSequenceNumver = 1;
114
115                 // init data structs
116                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
117                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
118                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
119                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
120                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
121                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
122                 arbitratorTable = new HashMap<IoTString, Long>();
123                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
124                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
125                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
126                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
127                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
128                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
129                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
130                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
131                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
132                 rejectedSlotList = new Vector<Long>();
133                 pendingTransactionQueue = new ArrayList<Transaction>();
134                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
135                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
136                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
137                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
138                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
139                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
140                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
141                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
142                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
143
144
145                 // Other init stuff
146                 numberOfSlots = buffer.capacity();
147                 setResizeThreshold();
148         }
149
150         // TODO: delete method
151         public synchronized void printSlots() {
152                 long o = buffer.getOldestSeqNum();
153                 long n = buffer.getNewestSeqNum();
154
155                 int[] types = new int[10];
156
157                 int num = 0;
158
159                 int livec = 0;
160                 int deadc = 0;
161                 for (long i = o; i < (n + 1); i++) {
162                         Slot s = buffer.getSlot(i);
163
164                         Vector<Entry> entries = s.getEntries();
165
166                         for (Entry e : entries) {
167                                 if (e.isLive()) {
168                                         int type = e.getType();
169                                         types[type] = types[type] + 1;
170                                         num++;
171                                         livec++;
172                                 } else {
173                                         deadc++;
174                                 }
175                         }
176                 }
177
178                 for (int i = 0; i < 10; i++) {
179                         System.out.println(i + "    " + types[i]);
180                 }
181                 System.out.println("Live count:   " + livec);
182                 System.out.println("Dead count:   " + deadc);
183                 System.out.println("Old:   " + o);
184                 System.out.println("New:   " + n);
185                 System.out.println("Size:   " + buffer.size());
186                 System.out.println("Commits:   " + liveCommitsTable.size());
187
188                 for (Long a : liveCommitsTable.keySet()) {
189                         for (Long b : liveCommitsTable.get(a).keySet()) {
190                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
191                                         System.out.print(kv + " ");
192                                 }
193                                 System.out.print("|| ");
194                         }
195                         System.out.println();
196                 }
197
198         }
199
200         /**
201          * Initialize the table by inserting a table status as the first entry into the table status
202          * also initialize the crypto stuff.
203          */
204         public synchronized void initTable() throws ServerException {
205                 cloud.initSecurity();
206
207                 // Create the first insertion into the block chain which is the table status
208                 Slot s = new Slot(this, 1, localMachineId);
209                 TableStatus status = new TableStatus(s, numberOfSlots);
210                 s.addEntry(status);
211                 Slot[] array = cloud.putSlot(s, numberOfSlots);
212
213                 if (array == null) {
214                         array = new Slot[] {s};
215                         // update local block chain
216                         validateAndUpdate(array, true);
217                 } else if (array.length == 1) {
218                         // in case we did push the slot BUT we failed to init it
219                         validateAndUpdate(array, true);
220                 } else {
221                         throw new Error("Error on initialization");
222                 }
223         }
224
225         /**
226          * Rebuild the table from scratch by pulling the latest block chain from the server.
227          */
228         public synchronized void rebuild() throws ServerException {
229                 // Just pull the latest slots from the server
230                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
231                 validateAndUpdate(newslots, true);
232         }
233
234 // public String toString() {
235 //      String retString = " Committed Table: \n";
236 //      retString += "---------------------------\n";
237 //      retString += commitedTable.toString();
238
239 //      retString += "\n\n";
240
241 //      retString += " Speculative Table: \n";
242 //      retString += "---------------------------\n";
243 //      retString += speculativeTable.toString();
244
245 //      return retString;
246 // }
247
248         public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
249                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
250         }
251
252         public synchronized Long getArbitrator(IoTString key) {
253                 return arbitratorTable.get(key);
254         }
255
256         public synchronized void close() {
257                 cloud.close();
258         }
259
260         public synchronized IoTString getCommitted(IoTString key)  {
261                 KeyValue kv = committedKeyValueTable.get(key);
262
263                 if (kv != null) {
264                         return kv.getValue();
265                 } else {
266                         return null;
267                 }
268         }
269
270         public synchronized IoTString getSpeculative(IoTString key) {
271                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
272
273                 if (kv == null) {
274                         kv = speculatedKeyValueTable.get(key);
275                 }
276
277                 if (kv == null) {
278                         kv = committedKeyValueTable.get(key);
279                 }
280
281                 if (kv != null) {
282                         return kv.getValue();
283                 } else {
284                         return null;
285                 }
286         }
287
288         public synchronized IoTString getCommittedAtomic(IoTString key) {
289                 KeyValue kv = committedKeyValueTable.get(key);
290
291                 if (arbitratorTable.get(key) == null) {
292                         throw new Error("Key not Found.");
293                 }
294
295                 // Make sure new key value pair matches the current arbitrator
296                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
297                         // TODO: Maybe not throw en error
298                         throw new Error("Not all Key Values Match Arbitrator.");
299                 }
300
301                 if (kv != null) {
302                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
303                         return kv.getValue();
304                 } else {
305                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
306                         return null;
307                 }
308         }
309
310         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
311                 if (arbitratorTable.get(key) == null) {
312                         throw new Error("Key not Found.");
313                 }
314
315                 // Make sure new key value pair matches the current arbitrator
316                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
317                         // TODO: Maybe not throw en error
318                         throw new Error("Not all Key Values Match Arbitrator.");
319                 }
320
321                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
322
323                 if (kv == null) {
324                         kv = speculatedKeyValueTable.get(key);
325                 }
326
327                 if (kv == null) {
328                         kv = committedKeyValueTable.get(key);
329                 }
330
331                 if (kv != null) {
332                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
333                         return kv.getValue();
334                 } else {
335                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
336                         return null;
337                 }
338         }
339
340         public synchronized boolean update()  {
341                 try {
342                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
343                         validateAndUpdate(newSlots, false);
344                         sendToServer(null);
345
346                         return true;
347                 } catch (Exception e) {
348                         // e.printStackTrace();
349                 }
350
351                 return false;
352         }
353
354         public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
355                 while (true) {
356                         if (arbitratorTable.get(keyName) != null) {
357                                 // There is already an arbitrator
358                                 return false;
359                         }
360
361                         NewKey newKey = new NewKey(null, keyName, machineId);
362                         if (sendToServer(newKey)) {
363                                 // If successfully inserted
364                                 return true;
365                         }
366                 }
367         }
368
369         public synchronized void startTransaction() {
370                 // Create a new transaction, invalidates any old pending transactions.
371                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
372         }
373
374         public synchronized void addKV(IoTString key, IoTString value) {
375
376                 // Make sure it is a valid key
377                 if (arbitratorTable.get(key) == null) {
378                         throw new Error("Key not Found.");
379                 }
380
381                 // Make sure new key value pair matches the current arbitrator
382                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
383                         // TODO: Maybe not throw en error
384                         throw new Error("Not all Key Values Match Arbitrator.");
385                 }
386
387                 // Add the key value to this transaction
388                 KeyValue kv = new KeyValue(key, value);
389                 pendingTransactionBuilder.addKV(kv);
390         }
391
392         public synchronized TransactionStatus commitTransaction() {
393
394                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
395                         // transaction with no updates will have no effect on the system
396                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
397                 }
398
399                 // Set the local transaction sequence number and increment
400                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
401                 localTransactionSequenceNumber++;
402
403                 // Create the transaction status
404                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
405
406                 // Create the new transaction
407                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
408                 newTransaction.setTransactionStatus(transactionStatus);
409
410                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
411                         // Add it to the queue and invalidate the builder for safety
412                         pendingTransactionQueue.add(newTransaction);
413                 } else {
414                         arbitrateOnLocalTransaction(newTransaction);
415                         updateLiveStateFromLocal();
416                 }
417
418                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
419
420                 try {
421                         sendToServer(null);
422                 } catch (ServerException e) {
423
424                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
425                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
426                                 Transaction transaction = iter.next();
427
428                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
429                                         // Already contacted this client so ignore all attempts to contact this client
430                                         // to preserve ordering for arbitrator
431                                         continue;
432                                 }
433
434                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
435
436                                 if (sendReturn.getFirst()) {
437                                         // Failed to contact over local
438                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
439                                 } else {
440                                         // Successful contact or should not contact
441
442                                         if (sendReturn.getSecond()) {
443                                                 // did arbitrate
444                                                 iter.remove();
445                                         }
446                                 }
447                         }
448                 }
449
450                 updateLiveStateFromLocal();
451
452                 return transactionStatus;
453         }
454
455         /**
456          * Get the machine ID for this client
457          */
458         public long getMachineId() {
459                 return localMachineId;
460         }
461
462         /**
463          * Decrement the number of live slots that we currently have
464          */
465         public void decrementLiveCount() {
466                 liveSlotCount--;
467         }
468
469         /**
470          * Recalculate the new resize threshold
471          */
472         private void setResizeThreshold() {
473                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
474                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
475         }
476
477         private boolean sendToServer(NewKey newKey) throws ServerException {
478
479                 try {
480                         // While we have stuff that needs inserting into the block chain
481                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
482
483                                 // Create the slot
484                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
485
486                                 // Try to fill the slot with data
487                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
488                                 boolean needsResize = fillSlotsReturn.getFirst();
489                                 int newSize = fillSlotsReturn.getSecond();
490                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
491
492                                 if (needsResize) {
493                                         // Reset which transaction to send
494                                         for (Transaction transaction : transactionPartsSent.keySet()) {
495                                                 transaction.resetNextPartToSend();
496
497                                                 // Set the transaction sequence number back to nothing
498                                                 if (!transaction.didSendAPartToServer()) {
499                                                         transaction.setSequenceNumber(-1);
500                                                 }
501                                         }
502
503                                         // Clear the sent data since we are trying again
504                                         pendingSendArbitrationEntriesToDelete.clear();
505                                         transactionPartsSent.clear();
506
507                                         // We needed a resize so try again
508                                         fillSlot(slot, true, newKey);
509                                 }
510
511                                 // Try to send to the server
512                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
513
514                                 if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
515                                         // Did insert into the block chain
516
517                                         if (sendSlotsReturn.getFirst()) {
518                                                 // This slot was what was inserted not a previous slot
519
520                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
521                                                 newKey = null;
522                                         }
523
524                                         // Remove the aborts and commit parts that were sent from the pending to send queue
525                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
526                                                 ArbitrationRound round = iter.next();
527                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
528
529                                                 if (round.isDoneSending()) {
530                                                         // Sent all the parts
531                                                         iter.remove();
532                                                 }
533                                         }
534
535                                         for (Transaction transaction : transactionPartsSent.keySet()) {
536                                                 transaction.resetServerFailure();
537
538                                                 // Update which transactions parts still need to be sent
539                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
540
541                                                 // Add the transaction status to the outstanding list
542                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
543
544                                                 // Update the transaction status
545                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
546
547                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
548                                                 if (transaction.didSendAllParts()) {
549                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
550                                                         pendingTransactionQueue.remove(transaction);
551                                                 }
552                                         }
553                                 } else {
554                                         // Reset which transaction to send
555                                         for (Transaction transaction : transactionPartsSent.keySet()) {
556                                                 transaction.resetNextPartToSend();
557                                                 transaction.resetNextPartToSend();
558
559                                                 // Set the transaction sequence number back to nothing
560                                                 if (!transaction.didSendAPartToServer()) {
561                                                         transaction.setSequenceNumber(-1);
562                                                 }
563                                         }
564                                 }
565
566                                 // Clear the sent data in preparation for next send
567                                 pendingSendArbitrationEntriesToDelete.clear();
568                                 transactionPartsSent.clear();
569
570                                 if (sendSlotsReturn.getThird().length != 0) {
571                                         // insert into the local block chain
572                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
573                                 }
574                         }
575                 } catch (ServerException e) {
576
577                         // System.out.println("Server Failure:   " + e.getType());
578
579                         if (e.getType() != ServerException.TypeInputTimeout) {
580                                 // e.printStackTrace();
581
582                                 // Nothing was able to be sent to the server so just clear these data structures
583                                 for (Transaction transaction : transactionPartsSent.keySet()) {
584                                         transaction.resetNextPartToSend();
585
586                                         // Set the transaction sequence number back to nothing
587                                         if (!transaction.didSendAPartToServer()) {
588                                                 transaction.setSequenceNumber(-1);
589                                         }
590                                 }
591                         } else {
592                                 // There was a partial send to the server
593                                 hadPartialSendToServer = true;
594
595                                 // Nothing was able to be sent to the server so just clear these data structures
596                                 for (Transaction transaction : transactionPartsSent.keySet()) {
597                                         transaction.resetNextPartToSend();
598                                         transaction.setServerFailure();
599                                 }
600                         }
601
602                         pendingSendArbitrationEntriesToDelete.clear();
603                         transactionPartsSent.clear();
604
605                         throw e;
606                 }
607
608                 return newKey == null;
609         }
610
611         public synchronized boolean updateFromLocal(long machineId) {
612                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
613                 if (localCommunicationInformation == null) {
614                         // Cant talk to that device locally so do nothing
615                         return false;
616                 }
617
618                 // Get the size of the send data
619                 int sendDataSize = Integer.BYTES + Long.BYTES;
620
621                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
622                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
623                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
624                 }
625
626                 byte[] sendData = new byte[sendDataSize];
627                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
628
629                 // Encode the data
630                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
631                 bbEncode.putInt(0);
632
633                 // Send by local
634                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
635
636                 if (returnData == null) {
637                         // Could not contact server
638                         return false;
639                 }
640
641                 // Decode the data
642                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
643                 int numberOfEntries = bbDecode.getInt();
644
645                 for (int i = 0; i < numberOfEntries; i++) {
646                         byte type = bbDecode.get();
647                         if (type == Entry.TypeAbort) {
648                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
649                                 processEntry(abort);
650                         } else if (type == Entry.TypeCommitPart) {
651                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
652                                 processEntry(commitPart);
653                         }
654                 }
655
656                 updateLiveStateFromLocal();
657
658                 return true;
659         }
660
661         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
662
663                 // Get the devices local communications
664                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
665
666                 if (localCommunicationInformation == null) {
667                         // Cant talk to that device locally so do nothing
668                         return new Pair<Boolean, Boolean>(false, false);
669                 }
670
671                 // Get the size of the send data
672                 int sendDataSize = Integer.BYTES + Long.BYTES;
673                 for (TransactionPart part : transaction.getParts().values()) {
674                         sendDataSize += part.getSize();
675                 }
676
677                 Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
678                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
679                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
680                 }
681
682                 // Make the send data size
683                 byte[] sendData = new byte[sendDataSize];
684                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
685
686                 // Encode the data
687                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
688                 bbEncode.putInt(transaction.getParts().size());
689                 for (TransactionPart part : transaction.getParts().values()) {
690                         part.encode(bbEncode);
691                 }
692
693
694                 // Send by local
695                 byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
696
697                 if (returnData == null) {
698                         // Could not contact server
699                         return new Pair<Boolean, Boolean>(true, false);
700                 }
701
702                 // Decode the data
703                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
704                 boolean didCommit = bbDecode.get() == 1;
705                 boolean couldArbitrate = bbDecode.get() == 1;
706                 int numberOfEntries = bbDecode.getInt();
707                 boolean foundAbort = false;
708
709                 for (int i = 0; i < numberOfEntries; i++) {
710                         byte type = bbDecode.get();
711                         if (type == Entry.TypeAbort) {
712                                 Abort abort = (Abort)Abort.decode(null, bbDecode);
713
714                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
715                                         foundAbort = true;
716                                 }
717
718                                 processEntry(abort);
719                         } else if (type == Entry.TypeCommitPart) {
720                                 CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
721                                 processEntry(commitPart);
722                         }
723                 }
724
725                 updateLiveStateFromLocal();
726
727                 if (couldArbitrate) {
728                         TransactionStatus status =  transaction.getTransactionStatus();
729                         if (didCommit) {
730                                 status.setStatus(TransactionStatus.StatusCommitted);
731                         } else {
732                                 status.setStatus(TransactionStatus.StatusAborted);
733                         }
734                 } else {
735                         TransactionStatus status =  transaction.getTransactionStatus();
736                         if (foundAbort) {
737                                 status.setStatus(TransactionStatus.StatusAborted);
738                         } else {
739                                 status.setStatus(TransactionStatus.StatusCommitted);
740                         }
741                 }
742
743                 return new Pair<Boolean, Boolean>(false, true);
744         }
745
746         public synchronized byte[] acceptDataFromLocal(byte[] data) {
747
748                 // Decode the data
749                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
750                 long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
751                 int numberOfParts = bbDecode.getInt();
752
753                 // If we did commit a transaction or not
754                 boolean didCommit = false;
755                 boolean couldArbitrate = false;
756
757                 if (numberOfParts != 0) {
758
759                         // decode the transaction
760                         Transaction transaction = new Transaction();
761                         for (int i = 0; i < numberOfParts; i++) {
762                                 bbDecode.get();
763                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
764                                 transaction.addPartDecode(newPart);
765                         }
766
767                         // Arbitrate on transaction and pull relevant return data
768                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
769                         couldArbitrate = localArbitrateReturn.getFirst();
770                         didCommit = localArbitrateReturn.getSecond();
771
772                         updateLiveStateFromLocal();
773
774                         // Transaction was sent to the server so keep track of it to prevent double commit
775                         if (transaction.getSequenceNumber() != -1) {
776                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
777                         }
778                 }
779
780                 // The data to send back
781                 int returnDataSize = 0;
782                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
783
784                 // Get the aborts to send back
785                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
786                 Collections.sort(abortLocalSequenceNumbers);
787                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
788                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
789                                 continue;
790                         }
791
792                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
793                         unseenArbitrations.add(abort);
794                         returnDataSize += abort.getSize();
795                 }
796
797                 // Get the commits to send back
798                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
799                 if (commitForClientTable != null) {
800                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
801                         Collections.sort(commitLocalSequenceNumbers);
802
803                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
804                                 Commit commit = commitForClientTable.get(localSequenceNumber);
805
806                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
807                                         continue;
808                                 }
809
810                                 unseenArbitrations.addAll(commit.getParts().values());
811
812                                 for (CommitPart commitPart : commit.getParts().values()) {
813                                         returnDataSize += commitPart.getSize();
814                                 }
815                         }
816                 }
817
818                 // Number of arbitration entries to decode
819                 returnDataSize += 2 * Integer.BYTES;
820
821                 // Boolean of did commit or not
822                 if (numberOfParts != 0) {
823                         returnDataSize += Byte.BYTES;
824                 }
825
826                 // Data to send Back
827                 byte[] returnData = new byte[returnDataSize];
828                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
829
830                 if (numberOfParts != 0) {
831                         if (didCommit) {
832                                 bbEncode.put((byte)1);
833                         } else {
834                                 bbEncode.put((byte)0);
835                         }
836                         if (couldArbitrate) {
837                                 bbEncode.put((byte)1);
838                         } else {
839                                 bbEncode.put((byte)0);
840                         }
841                 }
842
843                 bbEncode.putInt(unseenArbitrations.size());
844                 for (Entry entry : unseenArbitrations) {
845                         entry.encode(bbEncode);
846                 }
847
848                 return returnData;
849         }
850
851         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey)  throws ServerException {
852
853                 boolean attemptedToSendToServerTmp = attemptedToSendToServer;
854                 attemptedToSendToServer = true;
855
856                 boolean inserted = false;
857                 boolean lastTryInserted = false;
858
859                 Slot[] array = cloud.putSlot(slot, newSize);
860                 if (array == null) {
861                         array = new Slot[] {slot};
862                         rejectedSlotList.clear();
863                         inserted = true;
864                 }       else {
865                         if (array.length == 0) {
866                                 throw new Error("Server Error: Did not send any slots");
867                         }
868
869                         // if (attemptedToSendToServerTmp) {
870                         if (hadPartialSendToServer) {
871
872                                 boolean isInserted = false;
873                                 for (Slot s : array) {
874                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
875                                                 isInserted = true;
876                                                 break;
877                                         }
878                                 }
879
880                                 for (Slot s : array) {
881                                         if (isInserted) {
882                                                 break;
883                                         }
884
885                                         // Process each entry in the slot
886                                         for (Entry entry : s.getEntries()) {
887
888                                                 if (entry.getType() == Entry.TypeLastMessage) {
889                                                         LastMessage lastMessage = (LastMessage)entry;
890
891                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
892                                                                 isInserted = true;
893                                                                 break;
894                                                         }
895                                                 }
896                                         }
897                                 }
898
899                                 if (!isInserted) {
900                                         rejectedSlotList.add(slot.getSequenceNumber());
901                                         lastTryInserted = false;
902                                 } else {
903                                         lastTryInserted = true;
904                                 }
905                         } else {
906                                 rejectedSlotList.add(slot.getSequenceNumber());
907                                 lastTryInserted = false;
908                         }
909                 }
910
911                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
912         }
913
914         /**
915          * Returns false if a resize was needed
916          */
917         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
918                 int newSize = 0;
919                 if (liveSlotCount > bufferResizeThreshold) {
920                         resize = true; //Resize is forced
921                 }
922
923                 if (resize) {
924                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
925                         TableStatus status = new TableStatus(slot, newSize);
926                         slot.addEntry(status);
927                 }
928
929                 // Fill with rejected slots first before doing anything else
930                 doRejectedMessages(slot);
931
932                 // Do mandatory rescue of entries
933                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
934
935                 // Extract working variables
936                 boolean needsResize = mandatoryRescueReturn.getFirst();
937                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
938                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
939
940                 if (needsResize && !resize) {
941                         // We need to resize but we are not resizing so return false
942                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
943                 }
944
945                 boolean inserted = false;
946                 if (newKeyEntry != null) {
947                         newKeyEntry.setSlot(slot);
948                         if (slot.hasSpace(newKeyEntry)) {
949                                 slot.addEntry(newKeyEntry);
950                                 inserted = true;
951                         }
952                 }
953
954                 // Clear the transactions, aborts and commits that were sent previously
955                 transactionPartsSent.clear();
956                 pendingSendArbitrationEntriesToDelete.clear();
957
958                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
959                         boolean isFull = false;
960                         round.generateParts();
961                         List<Entry> parts = round.getParts();
962
963                         // Insert pending arbitration data
964                         for (Entry arbitrationData : parts) {
965
966                                 // If it is an abort then we need to set some information
967                                 if (arbitrationData instanceof Abort) {
968                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
969                                 }
970
971                                 if (!slot.hasSpace(arbitrationData)) {
972                                         // No space so cant do anything else with these data entries
973                                         isFull = true;
974                                         break;
975                                 }
976
977                                 // Add to this current slot and add it to entries to delete
978                                 slot.addEntry(arbitrationData);
979                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
980                         }
981
982                         if (isFull) {
983                                 break;
984                         }
985                 }
986
987                 if (pendingTransactionQueue.size() > 0) {
988
989                         Transaction transaction = pendingTransactionQueue.get(0);
990
991                         // Set the transaction sequence number if it has yet to be inserted into the block chain
992                         if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
993                                 transaction.setSequenceNumber(slot.getSequenceNumber());
994                         }
995
996                         while (true) {
997                                 TransactionPart part = transaction.getNextPartToSend();
998
999                                 if (part == null) {
1000                                         // Ran out of parts to send for this transaction so move on
1001                                         break;
1002                                 }
1003
1004                                 if (slot.hasSpace(part)) {
1005                                         slot.addEntry(part);
1006                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1007                                         if (partsSent == null) {
1008                                                 partsSent = new ArrayList<Integer>();
1009                                                 transactionPartsSent.put(transaction, partsSent);
1010                                         }
1011                                         partsSent.add(part.getPartNumber());
1012                                         transactionPartsSent.put(transaction, partsSent);
1013                                 } else {
1014                                         break;
1015                                 }
1016                         }
1017                 }
1018
1019                 // Fill the remainder of the slot with rescue data
1020                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1021
1022                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1023         }
1024
1025         private void doRejectedMessages(Slot s) {
1026                 if (! rejectedSlotList.isEmpty()) {
1027                         /* TODO: We should avoid generating a rejected message entry if
1028                          * there is already a sufficient entry in the queue (e.g.,
1029                          * equalsto value of true and same sequence number).  */
1030
1031                         long old_seqn = rejectedSlotList.firstElement();
1032                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1033                                 long new_seqn = rejectedSlotList.lastElement();
1034                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1035                                 s.addEntry(rm);
1036                         } else {
1037                                 long prev_seqn = -1;
1038                                 int i = 0;
1039                                 /* Go through list of missing messages */
1040                                 for (; i < rejectedSlotList.size(); i++) {
1041                                         long curr_seqn = rejectedSlotList.get(i);
1042                                         Slot s_msg = buffer.getSlot(curr_seqn);
1043                                         if (s_msg != null)
1044                                                 break;
1045                                         prev_seqn = curr_seqn;
1046                                 }
1047                                 /* Generate rejected message entry for missing messages */
1048                                 if (prev_seqn != -1) {
1049                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1050                                         s.addEntry(rm);
1051                                 }
1052                                 /* Generate rejected message entries for present messages */
1053                                 for (; i < rejectedSlotList.size(); i++) {
1054                                         long curr_seqn = rejectedSlotList.get(i);
1055                                         Slot s_msg = buffer.getSlot(curr_seqn);
1056                                         long machineid = s_msg.getMachineID();
1057                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1058                                         s.addEntry(rm);
1059                                 }
1060                         }
1061                 }
1062         }
1063
1064         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
1065                 long newestSequenceNumber = buffer.getNewestSeqNum();
1066                 long oldestSequenceNumber = buffer.getOldestSeqNum();
1067                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1068                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1069                 }
1070
1071                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
1072                 boolean seenLiveSlot = false;
1073                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
1074                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
1075
1076
1077                 // Mandatory Rescue
1078                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1079                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1080                         // Push slot number forward
1081                         if (! seenLiveSlot) {
1082                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1083                         }
1084
1085                         if (!previousSlot.isLive()) {
1086                                 continue;
1087                         }
1088
1089                         // We have seen a live slot
1090                         seenLiveSlot = true;
1091
1092                         // Get all the live entries for a slot
1093                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1094
1095                         // Iterate over all the live entries and try to rescue them
1096                         for (Entry liveEntry : liveEntries) {
1097                                 if (slot.hasSpace(liveEntry)) {
1098
1099                                         // Enough space to rescue the entry
1100                                         slot.addEntry(liveEntry);
1101                                 } else if (currentSequenceNumber == firstIfFull) {
1102                                         //if there's no space but the entry is about to fall off the queue
1103                                         System.out.println("B"); //?
1104                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1105
1106                                 }
1107                         }
1108                 }
1109
1110                 // Did not resize
1111                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1112         }
1113
1114         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
1115                 /* now go through live entries from least to greatest sequence number until
1116                  * either all live slots added, or the slot doesn't have enough room
1117                  * for SKIP_THRESHOLD consecutive entries*/
1118                 int skipcount = 0;
1119                 long newestseqnum = buffer.getNewestSeqNum();
1120                 search:
1121                 for (; seqn <= newestseqnum; seqn++) {
1122                         Slot prevslot = buffer.getSlot(seqn);
1123                         //Push slot number forward
1124                         if (!seenliveslot)
1125                                 oldestLiveSlotSequenceNumver = seqn;
1126
1127                         if (!prevslot.isLive())
1128                                 continue;
1129                         seenliveslot = true;
1130                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1131                         for (Entry liveentry : liveentries) {
1132                                 if (s.hasSpace(liveentry))
1133                                         s.addEntry(liveentry);
1134                                 else {
1135                                         skipcount++;
1136                                         if (skipcount > SKIP_THRESHOLD)
1137                                                 break search;
1138                                 }
1139                         }
1140                 }
1141         }
1142
1143         /**
1144          * Checks for malicious activity and updates the local copy of the block chain.
1145          */
1146         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
1147
1148                 // The cloud communication layer has checked slot HMACs already before decoding
1149                 if (newSlots.length == 0) {
1150                         return;
1151                 }
1152
1153                 // Make sure all slots are newer than the last largest slot this client has seen
1154                 long firstSeqNum = newSlots[0].getSequenceNumber();
1155                 if (firstSeqNum <= sequenceNumber) {
1156                         throw new Error("Server Error: Sent older slots!");
1157                 }
1158
1159                 // Create an object that can access both new slots and slots in our local chain
1160                 // without committing slots to our local chain
1161                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1162
1163                 // Check that the HMAC chain is not broken
1164                 checkHMACChain(indexer, newSlots);
1165
1166                 // Set to keep track of messages from clients
1167                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1168
1169                 // Process each slots data
1170                 for (Slot slot : newSlots) {
1171                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1172                         updateExpectedSize();
1173                 }
1174
1175                 // If there is a gap, check to see if the server sent us everything.
1176                 if (firstSeqNum != (sequenceNumber + 1)) {
1177
1178                         // Check the size of the slots that were sent down by the server.
1179                         // Can only check the size if there was a gap
1180                         checkNumSlots(newSlots.length);
1181
1182                         // Since there was a gap every machine must have pushed a slot or must have
1183                         // a last message message.  If not then the server is hiding slots
1184                         if (!machineSet.isEmpty()) {
1185                                 throw new Error("Missing record for machines: " + machineSet);
1186                         }
1187                 }
1188
1189                 // Update the size of our local block chain.
1190                 commitNewMaxSize();
1191
1192                 // Commit new to slots to the local block chain.
1193                 for (Slot slot : newSlots) {
1194
1195                         // Insert this slot into our local block chain copy.
1196                         buffer.putSlot(slot);
1197
1198                         // Keep track of how many slots are currently live (have live data in them).
1199                         liveSlotCount++;
1200                 }
1201
1202                 // Get the sequence number of the latest slot in the system
1203                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1204
1205                 updateLiveStateFromServer();
1206
1207                 // No Need to remember after we pulled from the server
1208                 offlineTransactionsCommittedAndAtServer.clear();
1209
1210                 // This is invalidated now
1211                 hadPartialSendToServer = false;
1212         }
1213
1214         private void updateLiveStateFromServer() {
1215                 // Process the new transaction parts
1216                 processNewTransactionParts();
1217
1218                 // Do arbitration on new transactions that were received
1219                 arbitrateFromServer();
1220
1221                 // Update all the committed keys
1222                 boolean didCommitOrSpeculate = updateCommittedTable();
1223
1224                 // Delete the transactions that are now dead
1225                 updateLiveTransactionsAndStatus();
1226
1227                 // Do speculations
1228                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1229                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1230         }
1231
1232         private void updateLiveStateFromLocal() {
1233                 // Update all the committed keys
1234                 boolean didCommitOrSpeculate = updateCommittedTable();
1235
1236                 // Delete the transactions that are now dead
1237                 updateLiveTransactionsAndStatus();
1238
1239                 // Do speculations
1240                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1241                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1242         }
1243
1244
1245         
1246         private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
1247                 if (didFindTableStatus) {
1248                         return;
1249                 }
1250                 long prevslots = firstSequenceNumber;
1251                 expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1252                 currMaxSize = numberOfSlots;
1253         }
1254
1255         private void updateExpectedSize() {
1256                 expectedsize++;
1257                 if (expectedsize > currMaxSize)
1258                 {
1259                         expectedsize = currMaxSize;
1260                 }
1261         }
1262
1263
1264         /**
1265          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1266          * This is only called when we have a gap between the slots that we have locally and the slots
1267          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1268          * status message
1269          */
1270         private void checkNumSlots(int numberOfSlots) {
1271                 if (numberOfSlots != expectedsize) {
1272                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1273                 }
1274         }
1275
1276         private void updateCurrMaxSize(int newmaxsize) {
1277                 currMaxSize=newmaxsize;
1278         }
1279
1280
1281         /**
1282          * Update the size of of the local buffer if it is needed.
1283          */
1284         private void commitNewMaxSize() {
1285                 didFindTableStatus = false;             
1286
1287                 // Resize the local slot buffer
1288                 if (numberOfSlots != currMaxSize) {
1289                         buffer.resize((int)currMaxSize);
1290                 }
1291
1292                 // Change the number of local slots to the new size
1293                 numberOfSlots = (int)currMaxSize;
1294
1295                 // Recalculate the resize threshold since the size of the local buffer has changed
1296                 setResizeThreshold();
1297         }
1298
1299         /**
1300          * Process the new transaction parts from this latest round of slots received from the server
1301          */
1302         private void processNewTransactionParts() {
1303
1304                 if (newTransactionParts.size() == 0) {
1305                         // Nothing new to process
1306                         return;
1307                 }
1308
1309                 // Iterate through all the machine Ids that we received new parts for
1310                 for (Long machineId : newTransactionParts.keySet()) {
1311                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1312
1313                         // Iterate through all the parts for that machine Id
1314                         for (Pair<Long, Integer> partId : parts.keySet()) {
1315                                 TransactionPart part = parts.get(partId);
1316
1317                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1318                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
1319                                         // Set dead the transaction part
1320                                         part.setDead();
1321                                         continue;
1322                                 }
1323
1324                                 // Get the transaction object for that sequence number
1325                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1326
1327                                 if (transaction == null) {
1328                                         // This is a new transaction that we dont have so make a new one
1329                                         transaction = new Transaction();
1330
1331                                         // Insert this new transaction into the live tables
1332                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1333                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1334                                 }
1335
1336                                 // Add that part to the transaction
1337                                 transaction.addPartDecode(part);
1338                         }
1339                 }
1340
1341                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1342                 newTransactionParts.clear();
1343         }
1344
1345         private void arbitrateFromServer() {
1346
1347                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1348                         // Nothing to arbitrate on so move on
1349                         return;
1350                 }
1351
1352                 // Get the transaction sequence numbers and sort from oldest to newest
1353                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1354                 Collections.sort(transactionSequenceNumbers);
1355
1356                 // Collection of key value pairs that are
1357                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1358
1359                 // The last transaction arbitrated on
1360                 long lastTransactionCommitted = -1;
1361                 Set<Abort> generatedAborts = new HashSet<Abort>();
1362
1363                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1364                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1365
1366                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1367                         if (transaction.getArbitrator() != localMachineId) {
1368                                 continue;
1369                         }
1370
1371                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1372                                 // We have seen this already locally so dont commit again
1373                                 continue;
1374                         }
1375
1376
1377                         if (!transaction.isComplete()) {
1378                                 // Will arbitrate in incorrect order if we continue so just break
1379                                 // Most likely this
1380                                 break;
1381                         }
1382
1383                         // update the largest transaction seen by arbitrator from server
1384                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
1385                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1386                         } else {
1387                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1388                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1389                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1390                                 }
1391                         }
1392
1393                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
1394                                 // Guard evaluated as true
1395
1396                                 // Update the local changes so we can make the commit
1397                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1398                                         speculativeTableTmp.put(kv.getKey(), kv);
1399                                 }
1400
1401                                 // Update what the last transaction committed was for use in batch commit
1402                                 lastTransactionCommitted = transaction.getSequenceNumber();
1403                         } else {
1404                                 // Guard evaluated was false so create abort
1405
1406                                 // Create the abort
1407                                 Abort newAbort = new Abort(null,
1408                                                            transaction.getClientLocalSequenceNumber(),
1409                                                            transaction.getSequenceNumber(),
1410                                                            transaction.getMachineId(),
1411                                                            transaction.getArbitrator(),
1412                                                            localArbitrationSequenceNumber);
1413                                 localArbitrationSequenceNumber++;
1414
1415                                 generatedAborts.add(newAbort);
1416
1417                                 // Insert the abort so we can process
1418                                 processEntry(newAbort);
1419                         }
1420                 }
1421
1422                 Commit newCommit = null;
1423
1424                 // If there is something to commit
1425                 if (speculativeTableTmp.size() != 0) {
1426
1427                         // Create the commit and increment the commit sequence number
1428                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1429                         localArbitrationSequenceNumber++;
1430
1431                         // Add all the new keys to the commit
1432                         for (KeyValue kv : speculativeTableTmp.values()) {
1433                                 newCommit.addKV(kv);
1434                         }
1435
1436                         // create the commit parts
1437                         newCommit.createCommitParts();
1438
1439                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1440
1441                         // Insert the commit so we can process it
1442                         for (CommitPart commitPart : newCommit.getParts().values()) {
1443                                 processEntry(commitPart);
1444                         }
1445                 }
1446
1447                 if ((newCommit != null) || (generatedAborts.size() > 0)) {
1448                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1449                         pendingSendArbitrationRounds.add(arbitrationRound);
1450
1451                         if (compactArbitrationData()) {
1452                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1453                                 if (newArbitrationRound.getCommit() != null) {
1454                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1455                                                 processEntry(commitPart);
1456                                         }
1457                                 }
1458                         }
1459                 }
1460         }
1461
1462         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1463
1464                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1465                 if (transaction.getArbitrator() != localMachineId) {
1466                         return new Pair<Boolean, Boolean>(false, false);
1467                 }
1468
1469                 if (!transaction.isComplete()) {
1470                         // Will arbitrate in incorrect order if we continue so just break
1471                         // Most likely this
1472                         return new Pair<Boolean, Boolean>(false, false);
1473                 }
1474
1475                 if (transaction.getMachineId() != localMachineId) {
1476                         // dont do this check for local transactions
1477                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
1478                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1479                                         // We've have already seen this from the server
1480                                         return new Pair<Boolean, Boolean>(false, false);
1481                                 }
1482                         }
1483                 }
1484
1485                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1486                         // Guard evaluated as true
1487
1488                         // Create the commit and increment the commit sequence number
1489                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1490                         localArbitrationSequenceNumber++;
1491
1492                         // Update the local changes so we can make the commit
1493                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1494                                 newCommit.addKV(kv);
1495                         }
1496
1497                         // create the commit parts
1498                         newCommit.createCommitParts();
1499
1500                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1501                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1502                         pendingSendArbitrationRounds.add(arbitrationRound);
1503
1504                         if (compactArbitrationData()) {
1505                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1506                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1507                                         processEntry(commitPart);
1508                                 }
1509                         } else {
1510                                 // Insert the commit so we can process it
1511                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1512                                         processEntry(commitPart);
1513                                 }
1514                         }
1515
1516                         if (transaction.getMachineId() == localMachineId) {
1517                                 TransactionStatus status = transaction.getTransactionStatus();
1518                                 if (status != null) {
1519                                         status.setStatus(TransactionStatus.StatusCommitted);
1520                                 }
1521                         }
1522
1523                         updateLiveStateFromLocal();
1524                         return new Pair<Boolean, Boolean>(true, true);
1525                 } else {
1526
1527                         if (transaction.getMachineId() == localMachineId) {
1528                                 // For locally created messages update the status
1529
1530                                 // Guard evaluated was false so create abort
1531                                 TransactionStatus status = transaction.getTransactionStatus();
1532                                 if (status != null) {
1533                                         status.setStatus(TransactionStatus.StatusAborted);
1534                                 }
1535                         } else {
1536                                 Set addAbortSet = new HashSet<Abort>();
1537
1538
1539                                 // Create the abort
1540                                 Abort newAbort = new Abort(null,
1541                                                            transaction.getClientLocalSequenceNumber(),
1542                                                            -1,
1543                                                            transaction.getMachineId(),
1544                                                            transaction.getArbitrator(),
1545                                                            localArbitrationSequenceNumber);
1546                                 localArbitrationSequenceNumber++;
1547
1548                                 addAbortSet.add(newAbort);
1549
1550
1551                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1552                                 ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
1553                                 pendingSendArbitrationRounds.add(arbitrationRound);
1554
1555                                 if (compactArbitrationData()) {
1556                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1557                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1558                                                 processEntry(commitPart);
1559                                         }
1560                                 }
1561                         }
1562
1563                         updateLiveStateFromLocal();
1564                         return new Pair<Boolean, Boolean>(true, false);
1565                 }
1566         }
1567
1568         /**
1569          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1570          */
1571         private boolean compactArbitrationData() {
1572
1573                 if (pendingSendArbitrationRounds.size() < 2) {
1574                         // Nothing to compact so do nothing
1575                         return false;
1576                 }
1577
1578                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1579                 if (lastRound.didSendPart()) {
1580                         return false;
1581                 }
1582
1583                 boolean hadCommit = (lastRound.getCommit() == null);
1584                 boolean gotNewCommit = false;
1585
1586                 int numberToDelete = 1;
1587                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1588                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1589
1590                         if (round.isFull() || round.didSendPart()) {
1591                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1592                                 break;
1593                         }
1594
1595                         if (round.getCommit() == null) {
1596
1597                                 // Try compacting aborts only
1598                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1599                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1600                                         // Cant compact since it would be too large
1601                                         break;
1602                                 }
1603                                 lastRound.addAborts(round.getAborts());
1604                         } else {
1605
1606                                 // Create a new larger commit
1607                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1608                                 localArbitrationSequenceNumber++;
1609
1610                                 // Create the commit parts so that we can count them
1611                                 newCommit.createCommitParts();
1612
1613                                 // Calculate the new size of the parts
1614                                 int newSize = newCommit.getNumberOfParts();
1615                                 newSize += lastRound.getAbortsCount();
1616                                 newSize += round.getAbortsCount();
1617
1618                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1619                                         // Cant compact since it would be too large
1620                                         break;
1621                                 }
1622
1623                                 // Set the new compacted part
1624                                 lastRound.setCommit(newCommit);
1625                                 lastRound.addAborts(round.getAborts());
1626                                 gotNewCommit = true;
1627                         }
1628
1629                         numberToDelete++;
1630                 }
1631
1632                 if (numberToDelete != 1) {
1633                         // If there is a compaction
1634
1635                         // Delete the previous pieces that are now in the new compacted piece
1636                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1637                                 pendingSendArbitrationRounds.clear();
1638                         } else {
1639                                 for (int i = 0; i < numberToDelete; i++) {
1640                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1641                                 }
1642                         }
1643
1644                         // Add the new compacted into the pending to send list
1645                         pendingSendArbitrationRounds.add(lastRound);
1646
1647                         // Should reinsert into the commit processor
1648                         if (hadCommit && gotNewCommit) {
1649                                 return true;
1650                         }
1651                 }
1652
1653                 return false;
1654         }
1655         // private boolean compactArbitrationData() {
1656         //      return false;
1657         // }
1658
1659         /**
1660          * Update all the commits and the committed tables, sets dead the dead transactions
1661          */
1662         private boolean updateCommittedTable() {
1663
1664                 if (newCommitParts.size() == 0) {
1665                         // Nothing new to process
1666                         return false;
1667                 }
1668
1669                 // Iterate through all the machine Ids that we received new parts for
1670                 for (Long machineId : newCommitParts.keySet()) {
1671                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1672
1673                         // Iterate through all the parts for that machine Id
1674                         for (Pair<Long, Integer> partId : parts.keySet()) {
1675                                 CommitPart part = parts.get(partId);
1676
1677                                 // Get the transaction object for that sequence number
1678                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1679
1680                                 if (commitForClientTable == null) {
1681                                         // This is the first commit from this device
1682                                         commitForClientTable = new HashMap<Long, Commit>();
1683                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1684                                 }
1685
1686                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
1687
1688                                 if (commit == null) {
1689                                         // This is a new commit that we dont have so make a new one
1690                                         commit = new Commit();
1691
1692                                         // Insert this new commit into the live tables
1693                                         commitForClientTable.put(part.getSequenceNumber(), commit);
1694                                 }
1695
1696                                 // Add that part to the commit
1697                                 commit.addPartDecode(part);
1698                         }
1699                 }
1700
1701                 // Clear all the new commits parts in preparation for the next time the server sends slots
1702                 newCommitParts.clear();
1703
1704                 // If we process a new commit keep track of it for future use
1705                 boolean didProcessANewCommit = false;
1706
1707                 // Process the commits one by one
1708                 for (Long arbitratorId : liveCommitsTable.keySet()) {
1709
1710                         // Get all the commits for a specific arbitrator
1711                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
1712
1713                         // Sort the commits in order
1714                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1715                         Collections.sort(commitSequenceNumbers);
1716
1717                         // Get the last commit seen from this arbitrator
1718                         long lastCommitSeenSequenceNumber = -1;
1719                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
1720                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
1721                         }
1722
1723                         // Go through each new commit one by one
1724                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
1725                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
1726                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
1727
1728                                 // Special processing if a commit is not complete
1729                                 if (!commit.isComplete()) {
1730                                         if (i == (commitSequenceNumbers.size() - 1)) {
1731                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1732                                                 break;
1733                                         } else {
1734                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
1735                                                 // Delete it and move on
1736                                                 commit.setDead();
1737                                                 commitForClientTable.remove(commit.getSequenceNumber());
1738                                                 continue;
1739                                         }
1740                                 }
1741
1742                                 // Update the last arbitration data that we have seen so far
1743                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
1744
1745                                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
1746                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
1747                                                 // Is larger
1748                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
1749                                         }
1750                                 } else {
1751                                         // Never seen any data from this arbitrator so record the first one
1752                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
1753                                 }
1754
1755                                 // We have already seen this commit before so need to do the full processing on this commit
1756                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1757
1758                                         // Update the last transaction that was updated if we can
1759                                         if (commit.getTransactionSequenceNumber() != -1) {
1760                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1761
1762                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1763                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1764                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1765                                                 }
1766                                         }
1767
1768                                         continue;
1769                                 }
1770
1771                                 // If we got here then this is a brand new commit and needs full processing
1772
1773                                 // Get what commits should be edited, these are the commits that have live values for their keys
1774                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
1775                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1776                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
1777                                 }
1778                                 commitsToEdit.remove(null); // remove null since it could be in this set
1779
1780                                 // Update each previous commit that needs to be updated
1781                                 for (Commit previousCommit : commitsToEdit) {
1782
1783                                         // Only bother with live commits (TODO: Maybe remove this check)
1784                                         if (previousCommit.isLive()) {
1785
1786                                                 // Update which keys in the old commits are still live
1787                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1788                                                         previousCommit.invalidateKey(kv.getKey());
1789                                                 }
1790
1791                                                 // if the commit is now dead then remove it
1792                                                 if (!previousCommit.isLive()) {
1793                                                         commitForClientTable.remove(previousCommit);
1794                                                 }
1795                                         }
1796                                 }
1797
1798                                 // Update the last seen sequence number from this arbitrator
1799                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
1800                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
1801                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
1802                                         }
1803                                 } else {
1804                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
1805                                 }
1806
1807                                 // Update the last transaction that was updated if we can
1808                                 if (commit.getTransactionSequenceNumber() != -1) {
1809                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1810
1811                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1812                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1813                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1814                                         }
1815                                 }
1816
1817                                 // We processed a new commit that we havent seen before
1818                                 didProcessANewCommit = true;
1819
1820                                 // Update the committed table of keys and which commit is using which key
1821                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1822                                         committedKeyValueTable.put(kv.getKey(), kv);
1823                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
1824                                 }
1825                         }
1826                 }
1827
1828                 return didProcessANewCommit;
1829         }
1830
1831         /**
1832          * Create the speculative table from transactions that are still live and have come from the cloud
1833          */
1834         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
1835                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
1836                         // There is nothing to speculate on
1837                         return false;
1838                 }
1839
1840                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
1841                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1842                 Collections.sort(transactionSequenceNumbersSorted);
1843
1844                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
1845
1846
1847                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
1848                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
1849                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
1850
1851                         // Start from scratch
1852                         speculatedKeyValueTable.clear();
1853                         lastTransactionSequenceNumberSpeculatedOn = -1;
1854                         oldestTransactionSequenceNumberSpeculatedOn = -1;
1855
1856                 }
1857
1858                 // Remember the front of the transaction list
1859                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
1860
1861                 // Find where to start arbitration from
1862                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
1863
1864                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
1865                         // Make sure we are not out of bounds
1866                         return false; // did not speculate
1867                 }
1868
1869                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
1870                 boolean didSkip = true;
1871
1872                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
1873                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
1874                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1875
1876                         if (!transaction.isComplete()) {
1877                                 // If there is an incomplete transaction then there is nothing we can do
1878                                 // add this transactions arbitrator to the list of arbitrators we should ignore
1879                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
1880                                 didSkip = true;
1881                                 continue;
1882                         }
1883
1884                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
1885                                 continue;
1886                         }
1887
1888                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
1889
1890                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
1891                                 // Guard evaluated to true so update the speculative table
1892                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1893                                         speculatedKeyValueTable.put(kv.getKey(), kv);
1894                                 }
1895                         }
1896                 }
1897
1898                 if (didSkip) {
1899                         // Since there was a skip we need to redo the speculation next time around
1900                         lastTransactionSequenceNumberSpeculatedOn = -1;
1901                         oldestTransactionSequenceNumberSpeculatedOn = -1;
1902                 }
1903
1904                 // We did some speculation
1905                 return true;
1906         }
1907
1908         /**
1909          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
1910          */
1911         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
1912                 if (pendingTransactionQueue.size() == 0) {
1913                         // There is nothing to speculate on
1914                         return;
1915                 }
1916
1917
1918                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
1919                         // need to reset on the pending speculation
1920                         lastPendingTransactionSpeculatedOn = null;
1921                         firstPendingTransaction = pendingTransactionQueue.get(0);
1922                         pendingTransactionSpeculatedKeyValueTable.clear();
1923                 }
1924
1925                 // Find where to start arbitration from
1926                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
1927
1928                 if (startIndex >= pendingTransactionQueue.size()) {
1929                         // Make sure we are not out of bounds
1930                         return;
1931                 }
1932
1933                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
1934                         Transaction transaction = pendingTransactionQueue.get(i);
1935
1936                         lastPendingTransactionSpeculatedOn = transaction;
1937
1938                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
1939                                 // Guard evaluated to true so update the speculative table
1940                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1941                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
1942                                 }
1943                         }
1944                 }
1945         }
1946
1947         /**
1948          * Set dead and remove from the live transaction tables the transactions that are dead
1949          */
1950         private void updateLiveTransactionsAndStatus() {
1951
1952                 // Go through each of the transactions
1953                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
1954                         Transaction transaction = iter.next().getValue();
1955
1956                         // Check if the transaction is dead
1957                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
1958                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
1959
1960                                 // Set dead the transaction
1961                                 transaction.setDead();
1962
1963                                 // Remove the transaction from the live table
1964                                 iter.remove();
1965                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
1966                         }
1967                 }
1968
1969                 // Go through each of the transactions
1970                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
1971                         TransactionStatus status = iter.next().getValue();
1972
1973                         // Check if the transaction is dead
1974                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
1975                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
1976
1977                                 // Set committed
1978                                 status.setStatus(TransactionStatus.StatusCommitted);
1979
1980                                 // Remove
1981                                 iter.remove();
1982                         }
1983                 }
1984         }
1985
1986         /**
1987          * Process this slot, entry by entry.  Also update the latest message sent by slot
1988          */
1989         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
1990
1991                 // Update the last message seen
1992                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
1993
1994                 // Process each entry in the slot
1995                 for (Entry entry : slot.getEntries()) {
1996                         switch (entry.getType()) {
1997
1998                         case Entry.TypeCommitPart:
1999                                 processEntry((CommitPart)entry);
2000                                 break;
2001
2002                         case Entry.TypeAbort:
2003                                 processEntry((Abort)entry);
2004                                 break;
2005
2006                         case Entry.TypeTransactionPart:
2007                                 processEntry((TransactionPart)entry);
2008                                 break;
2009
2010                         case Entry.TypeNewKey:
2011                                 processEntry((NewKey)entry);
2012                                 break;
2013
2014                         case Entry.TypeLastMessage:
2015                                 processEntry((LastMessage)entry, machineSet);
2016                                 break;
2017
2018                         case Entry.TypeRejectedMessage:
2019                                 processEntry((RejectedMessage)entry, indexer);
2020                                 break;
2021
2022                         case Entry.TypeTableStatus:
2023                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2024                                 break;
2025
2026                         default:
2027                                 throw new Error("Unrecognized type: " + entry.getType());
2028                         }
2029                 }
2030         }
2031
2032         /**
2033          * Update the last message that was sent for a machine Id
2034          */
2035         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2036                 // Update what the last message received by a machine was
2037                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2038         }
2039
2040         /**
2041          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2042          */
2043         private void processEntry(NewKey entry) {
2044
2045                 // Update the arbitrator table with the new key information
2046                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2047
2048                 // Update what the latest live new key is
2049                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2050                 if (oldNewKey != null) {
2051                         // Delete the old new key messages
2052                         oldNewKey.setDead();
2053                 }
2054         }
2055
2056         /**
2057          * Process new table status entries and set dead the old ones as new ones come in.
2058          * keeps track of the largest and smallest table status seen in this current round
2059          * of updating the local copy of the block chain
2060          */
2061         private void processEntry(TableStatus entry, long seq) {
2062                 int newNumSlots = entry.getMaxSlots();
2063                 updateCurrMaxSize(newNumSlots);
2064
2065                 initExpectedSize(seq, newNumSlots);
2066
2067                 if (liveTableStatus != null) {
2068                         // We have a larger table status so the old table status is no longer alive
2069                         liveTableStatus.setDead();
2070                 }
2071
2072                 // Make this new table status the latest alive table status
2073                 liveTableStatus = entry;
2074         }
2075
2076         /**
2077          * Check old messages to see if there is a block chain violation. Also
2078          */
2079         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2080                 long oldSeqNum = entry.getOldSeqNum();
2081                 long newSeqNum = entry.getNewSeqNum();
2082                 boolean isequal = entry.getEqual();
2083                 long machineId = entry.getMachineID();
2084                 long seq = entry.getSequenceNumber();
2085
2086
2087                 // Check if we have messages that were supposed to be rejected in our local block chain
2088                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2089
2090                         // Get the slot
2091                         Slot slot = indexer.getSlot(seqNum);
2092
2093                         if (slot != null) {
2094                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2095
2096                                 long slotMachineId = slot.getMachineID();
2097                                 if (isequal != (slotMachineId == machineId)) {
2098                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2099                                 }
2100                         }
2101                 }
2102
2103
2104                 // Create a list of clients to watch until they see this rejected message entry.
2105                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2106                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2107
2108                         // Machine ID for the last message entry
2109                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
2110
2111                         // We've seen it, don't need to continue to watch.  Our next
2112                         // message will implicitly acknowledge it.
2113                         if (lastMessageEntryMachineId == localMachineId) {
2114                                 continue;
2115                         }
2116
2117                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2118                         long entrySequenceNumber = lastMessageValue.getFirst();
2119
2120                         if (entrySequenceNumber < seq) {
2121
2122                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2123                                 addWatchList(lastMessageEntryMachineId, entry);
2124
2125                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2126                                 deviceWatchSet.add(lastMessageEntryMachineId);
2127                         }
2128                 }
2129
2130                 if (deviceWatchSet.isEmpty()) {
2131                         // This rejected message has been seen by all the clients so
2132                         entry.setDead();
2133                 } else {
2134                         // We need to watch this rejected message
2135                         entry.setWatchSet(deviceWatchSet);
2136                 }
2137         }
2138
2139         /**
2140          * Check if this abort is live, if not then save it so we can kill it later.
2141          * update the last transaction number that was arbitrated on.
2142          */
2143         private void processEntry(Abort entry) {
2144
2145
2146                 if (entry.getTransactionSequenceNumber() != -1) {
2147                         // update the transaction status if it was sent to the server
2148                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2149                         if (status != null) {
2150                                 status.setStatus(TransactionStatus.StatusAborted);
2151                         }
2152                 }
2153
2154                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2155                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2156                 if (previouslySeenAbort != null) {
2157                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2158                 }
2159
2160                 if (entry.getTransactionArbitrator() == localMachineId) {
2161                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2162                 }
2163
2164                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2165
2166                         // The machine already saw this so it is dead
2167                         entry.setDead();
2168                         liveAbortTable.remove(entry.getAbortId());
2169
2170                         if (entry.getTransactionArbitrator() == localMachineId) {
2171                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2172                         }
2173
2174                         return;
2175                 }
2176
2177
2178
2179
2180                 // Update the last arbitration data that we have seen so far
2181                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
2182
2183                         long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2184                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2185                                 // Is larger
2186                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2187                         }
2188                 } else {
2189                         // Never seen any data from this arbitrator so record the first one
2190                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2191                 }
2192
2193
2194                 // Set dead a transaction if we can
2195                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2196                 if (transactionToSetDead != null) {
2197                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2198                 }
2199
2200                 // Update the last transaction sequence number that the arbitrator arbitrated on
2201                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2202                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2203
2204                         // Is a valid one
2205                         if (entry.getTransactionSequenceNumber() != -1) {
2206                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2207                         }
2208                 }
2209         }
2210
2211         /**
2212          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2213          */
2214         private void processEntry(TransactionPart entry) {
2215                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2216                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2217                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2218                         // This transaction is dead, it was already committed or aborted
2219                         entry.setDead();
2220                         return;
2221                 }
2222
2223                 // This part is still alive
2224                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2225
2226                 if (transactionPart == null) {
2227                         // Dont have a table for this machine Id yet so make one
2228                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2229                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2230                 }
2231
2232                 // Update the part and set dead ones we have already seen (got a rescued version)
2233                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2234                 if (previouslySeenPart != null) {
2235                         previouslySeenPart.setDead();
2236                 }
2237         }
2238
2239         /**
2240          * Process new commit entries and save them for future use.  Delete duplicates
2241          */
2242         private void processEntry(CommitPart entry) {
2243                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2244
2245                 if (commitPart == null) {
2246                         // Don't have a table for this machine Id yet so make one
2247                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2248                         newCommitParts.put(entry.getMachineId(), commitPart);
2249                 }
2250
2251                 // Update the part and set dead ones we have already seen (got a rescued version)
2252                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2253                 if (previouslySeenPart != null) {
2254                         previouslySeenPart.setDead();
2255                 }
2256         }
2257
2258         /**
2259          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2260          * Updates the live aborts, removes those that are dead and sets them dead.
2261          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2262          * other clients have not had a rollback on the last message.
2263          */
2264         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
2265
2266                 // We have seen this machine ID
2267                 machineSet.remove(machineId);
2268
2269                 // Get the set of rejected messages that this machine Id is has not seen yet
2270                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2271
2272                 // If there is a rejected message that this machine Id has not seen yet
2273                 if (watchset != null) {
2274
2275                         // Go through each rejected message that this machine Id has not seen yet
2276                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2277
2278                                 RejectedMessage rm = rmit.next();
2279
2280                                 // If this machine Id has seen this rejected message...
2281                                 if (rm.getSequenceNumber() <= seqNum) {
2282
2283                                         // Remove it from our watchlist
2284                                         rmit.remove();
2285
2286                                         // Decrement machines that need to see this notification
2287                                         rm.removeWatcher(machineId);
2288                                 }
2289                         }
2290                 }
2291
2292                 // Set dead the abort
2293                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2294                         Abort abort = i.next().getValue();
2295
2296                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2297                                 abort.setDead();
2298                                 i.remove();
2299
2300                                 if (abort.getTransactionArbitrator() == localMachineId) {
2301                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2302                                 }
2303                         }
2304                 }
2305
2306
2307
2308                 if (machineId == localMachineId) {
2309                         // Our own messages are immediately dead.
2310                         if (liveness instanceof LastMessage) {
2311                                 ((LastMessage)liveness).setDead();
2312                         } else if (liveness instanceof Slot) {
2313                                 ((Slot)liveness).setDead();
2314                         } else {
2315                                 throw new Error("Unrecognized type");
2316                         }
2317                 }
2318
2319                 // Get the old last message for this device
2320                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2321                 if (lastMessageEntry == null) {
2322                         // If no last message then there is nothing else to process
2323                         return;
2324                 }
2325
2326                 long lastMessageSeqNum = lastMessageEntry.getFirst();
2327                 Liveness lastEntry = lastMessageEntry.getSecond();
2328
2329                 // If it is not our machine Id since we already set ours to dead
2330                 if (machineId != localMachineId) {
2331                         if (lastEntry instanceof LastMessage) {
2332                                 ((LastMessage)lastEntry).setDead();
2333                         } else if (lastEntry instanceof Slot) {
2334                                 ((Slot)lastEntry).setDead();
2335                         } else {
2336                                 throw new Error("Unrecognized type");
2337                         }
2338                 }
2339
2340                 // Make sure the server is not playing any games
2341                 if (machineId == localMachineId) {
2342
2343                         if (hadPartialSendToServer) {
2344                                 // We were not making any updates and we had a machine mismatch
2345                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2346                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2347                                 }
2348
2349                         } else {
2350                                 // We were not making any updates and we had a machine mismatch
2351                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2352                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2353                                 }
2354                         }
2355                 } else {
2356                         if (lastMessageSeqNum > seqNum) {
2357                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2358                         }
2359                 }
2360         }
2361
2362         /**
2363          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2364          * rejected message entry and which have not.
2365          */
2366         private void addWatchList(long machineId, RejectedMessage entry) {
2367                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2368                 if (entries == null) {
2369                         // There is no set for this machine ID yet so create one
2370                         entries = new HashSet<RejectedMessage>();
2371                         rejectedMessageWatchListTable.put(machineId, entries);
2372                 }
2373                 entries.add(entry);
2374         }
2375
2376         /**
2377          * Check if the HMAC chain is not violated
2378          */
2379         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2380                 for (int i = 0; i < newSlots.length; i++) {
2381                         Slot currSlot = newSlots[i];
2382                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2383                         if (prevSlot != null &&
2384                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2385                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2386                 }
2387         }
2388 }