Block Chain Transactions, Commits multiple parts version
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2
3 import java.util.Iterator;
4 import java.util.Random;
5 import java.util.Arrays;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.List;
9 import java.util.Vector;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.ArrayList;
13 import java.util.Collections;
14
15 /**
16  * IoTTable data structure.  Provides client interface.
17  * @author Brian Demsky
18  * @version 1.0
19  */
20
21 final public class Table {
22
23
24         /* Constants */
25         static final int FREE_SLOTS = 10; // Number of slots that should be kept free
26         static final int SKIP_THRESHOLD = 10;
27         static final double RESIZE_MULTIPLE = 1.2;
28         static final double RESIZE_THRESHOLD = 0.75;
29         static final int REJECTED_THRESHOLD = 5;
30
31
32         /* Helper Objects */
33         private SlotBuffer buffer = null;
34         private CloudComm cloud = null;
35         private Random random = null;
36         private TableStatus liveTableStatus = null;
37         private PendingTransaction pendingTransactionBuilder = null; // Pending Transaction used in building a Pending Transaction
38         private Transaction lastPendingTransactionSpeculatedOn = null; // Last transaction that was speculated on from the pending transaction
39         private Transaction firstPendingTransaction = null; // first transaction in the pending transaction list
40
41         /* Variables */
42         private int numberOfSlots = 0;  // Number of slots stored in buffer
43         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
44         private long liveSlotCount = 0; // Number of currently live slots
45         private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
46         private long localMachineId = 0; // Machine ID of this client device
47         private long sequenceNumber = 0; // Largest sequence number a client has received
48         private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
49         private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
50         private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
51         private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
52         private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
53         private long localCommitSequenceNumber = 0;
54
55         /* Data Structures  */
56         private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
57         private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
58         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
59
60         private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
61         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
62         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
63         private Map<IoTString, Long> arbitratorTable = null; // Table of keys and their arbitrators
64         private Map<Pair<Long, Long>, Abort> liveAbortTable = null; // Table live abort messages
65         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = null; // transaction parts that are seen in this latest round of slots from the server
66         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = null; // commit parts that are seen in this latest round of slots from the server
67         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = null; // Last transaction sequence number that an arbitrator arbitrated on
68         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = null; // live transaction grouped by the sequence number
69         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = null; // live transaction grouped by the transaction ID
70         private Map<Long, Map<Long, Commit>> liveCommitsTable = null;
71         private Map<IoTString, Commit> liveCommitsByKeyTable = null;
72         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
73         private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
74
75         private List<Transaction> pendingTransactionQueue = null;
76         private List<Entry> pendingSendArbitrationEntries = null;
77         private List<Entry> pendingSendArbitrationEntriesToDelete = null;
78         private Map<Transaction, List<Integer>> transactionPartsSent = null;
79         private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
80
81
82
83
84
85         public Table(String baseurl, String password, long _localMachineId) {
86                 localMachineId = _localMachineId;
87                 cloud = new CloudComm(this, baseurl, password);
88
89                 init();
90         }
91
92         public Table(CloudComm _cloud, long _localMachineId) {
93                 localMachineId = _localMachineId;
94                 cloud = _cloud;
95
96                 init();
97         }
98
99         /**
100          * Init all the stuff needed for for table usage
101          */
102         private void init() {
103
104                 // Init helper objects
105                 random = new Random();
106                 buffer = new SlotBuffer();
107
108                 // Set Variables
109                 oldestLiveSlotSequenceNumver = 1;
110
111                 // init data structs
112                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
113                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
114                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
115                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
116                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
117                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
118                 arbitratorTable = new HashMap<IoTString, Long>();
119                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
120                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
121                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
122                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
123                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
124                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
125                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
126                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
127                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
128                 rejectedSlotList = new Vector<Long>();
129                 pendingTransactionQueue = new ArrayList<Transaction>();
130                 pendingSendArbitrationEntries = new ArrayList<Entry>();
131                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
132                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
133                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
134
135                 // Other init stuff
136                 numberOfSlots = buffer.capacity();
137                 setResizeThreshold();
138         }
139
140         /**
141          * Initialize the table by inserting a table status as the first entry into the table status
142          * also initialize the crypto stuff.
143          */
144         public synchronized void initTable() throws ServerException {
145                 cloud.setSalt(); //Set the salt
146
147                 // Create the first insertion into the block chain which is the table status
148                 Slot s = new Slot(this, 1, localMachineId);
149                 TableStatus status = new TableStatus(s, numberOfSlots);
150                 s.addEntry(status);
151                 Slot[] array = cloud.putSlot(s, numberOfSlots);
152
153                 if (array == null) {
154                         array = new Slot[] {s};
155                         // update local block chain
156                         validateAndUpdate(array, true);
157                 } else {
158                         throw new Error("Error on initialization");
159                 }
160         }
161
162         /**
163          * Rebuild the table from scratch by pulling the latest block chain from the server.
164          */
165         public synchronized void rebuild() throws ServerException {
166                 // Just pull the latest slots from the server
167                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
168                 validateAndUpdate(newslots, true);
169         }
170
171         // public String toString() {
172         //      String retString = " Committed Table: \n";
173         //      retString += "---------------------------\n";
174         //      retString += commitedTable.toString();
175
176         //      retString += "\n\n";
177
178         //      retString += " Speculative Table: \n";
179         //      retString += "---------------------------\n";
180         //      retString += speculativeTable.toString();
181
182         //      return retString;
183         // }
184
185         public synchronized Long getArbitrator(IoTString key) {
186                 return arbitratorTable.get(key);
187         }
188
189         public synchronized IoTString getCommitted(IoTString key)  {
190                 KeyValue kv = committedKeyValueTable.get(key);
191
192                 if (kv != null) {
193                         return kv.getValue();
194                 } else {
195                         return null;
196                 }
197         }
198
199         public synchronized IoTString getSpeculative(IoTString key) {
200                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
201
202                 if (kv == null) {
203                         kv = speculatedKeyValueTable.get(key);
204                 }
205
206                 if (kv == null) {
207                         kv = committedKeyValueTable.get(key);
208                 }
209
210                 if (kv != null) {
211                         return kv.getValue();
212                 } else {
213                         return null;
214                 }
215         }
216
217         public synchronized IoTString getCommittedAtomic(IoTString key) {
218                 KeyValue kv = committedKeyValueTable.get(key);
219
220                 if (arbitratorTable.get(key) == null) {
221                         throw new Error("Key not Found.");
222                 }
223
224                 // Make sure new key value pair matches the current arbitrator
225                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
226                         // TODO: Maybe not throw en error
227                         throw new Error("Not all Key Values Match Arbitrator.");
228                 }
229
230                 if (kv != null) {
231                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
232                         return kv.getValue();
233                 } else {
234                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
235                         return null;
236                 }
237         }
238
239         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
240                 if (arbitratorTable.get(key) == null) {
241                         throw new Error("Key not Found.");
242                 }
243
244                 // Make sure new key value pair matches the current arbitrator
245                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
246                         // TODO: Maybe not throw en error
247                         throw new Error("Not all Key Values Match Arbitrator.");
248                 }
249
250                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
251
252                 if (kv == null) {
253                         kv = speculatedKeyValueTable.get(key);
254                 }
255
256                 if (kv == null) {
257                         kv = committedKeyValueTable.get(key);
258                 }
259
260                 if (kv != null) {
261                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
262                         return kv.getValue();
263                 } else {
264                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, null));
265                         return null;
266                 }
267         }
268
269         public synchronized void update()  {
270                 try {
271                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
272                         validateAndUpdate(newSlots, false);
273                         sendToServer(null);
274                 } catch (Exception e) {
275                         e.printStackTrace();
276                 }
277         }
278
279         public synchronized boolean createNewKey(IoTString keyName, long machineId)  {
280                 while (true) {
281                         if (arbitratorTable.get(keyName) != null) {
282                                 // There is already an arbitrator
283                                 return false;
284                         }
285
286                         NewKey newKey = new NewKey(null, keyName, machineId);
287                         if (sendToServer(newKey)) {
288                                 // If successfully inserted
289                                 return true;
290                         }
291                 }
292         }
293
294         public void startTransaction() {
295                 // Create a new transaction, invalidates any old pending transactions.
296                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
297         }
298
299         public synchronized void addKV(IoTString key, IoTString value) {
300
301                 // Make sure it is a valid key
302                 if (arbitratorTable.get(key) == null) {
303                         throw new Error("Key not Found.");
304                 }
305
306                 // Make sure new key value pair matches the current arbitrator
307                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
308                         // TODO: Maybe not throw en error
309                         throw new Error("Not all Key Values Match Arbitrator.");
310                 }
311
312                 // Add the key value to this transaction
313                 KeyValue kv = new KeyValue(key, value);
314                 pendingTransactionBuilder.addKV(kv);
315         }
316
317         public synchronized TransactionStatus commitTransaction() {
318
319                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
320                         // transaction with no updates will have no effect on the system
321                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
322                 }
323
324                 // Set the local transaction sequence number and increment
325                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
326                 localTransactionSequenceNumber++;
327
328                 // Create the transaction status
329                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
330
331                 // Create the new transaction
332                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
333                 newTransaction.setTransactionStatus(transactionStatus);
334
335                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
336                         // Add it to the queue and invalidate the builder for safety
337                         pendingTransactionQueue.add(newTransaction);
338                 } else {
339                         arbitrateOnLocalTransaction(newTransaction);
340                         updateLiveStateFromLocal();
341                 }
342
343                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
344
345                 sendToServer(null);
346
347                 return transactionStatus;
348         }
349
350         /**
351          * Get the machine ID for this client
352          */
353         public long getMachineId() {
354                 return localMachineId;
355         }
356
357         /**
358          * Decrement the number of live slots that we currently have
359          */
360         public void decrementLiveCount() {
361                 liveSlotCount--;
362         }
363
364         /**
365          * Recalculate the new resize threshold
366          */
367         private void setResizeThreshold() {
368                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
369                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
370         }
371
372         private boolean sendToServer(NewKey newKey) {
373
374                 try {
375                         // While we have stuff that needs inserting into the block chain
376                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationEntries.size() > 0) || (newKey != null)) {
377
378                                 // try {
379                                 //      Thread.sleep(300);
380                                 // } catch (Exception e) {
381
382                                 // }
383
384                                 // Create the slot
385                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
386
387                                 // Try to fill the slot with data
388                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
389                                 boolean needsResize = fillSlotsReturn.getFirst();
390                                 int newSize = fillSlotsReturn.getSecond();
391                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
392
393                                 if (needsResize) {
394                                         // Reset which transaction to send
395                                         for (Transaction transaction : transactionPartsSent.keySet()) {
396                                                 transaction.resetNextPartToSend();
397
398                                                 // Set the transaction sequence number back to nothing
399                                                 if (!transaction.didSendAPartToServer()) {
400                                                         transaction.setSequenceNumber(-1);
401                                                 }
402                                         }
403
404                                         // Clear the sent data since we are trying again
405                                         pendingSendArbitrationEntriesToDelete.clear();
406                                         transactionPartsSent.clear();
407
408                                         // We needed a resize so try again
409                                         fillSlot(slot, true, newKey);
410                                 }
411
412                                 // Try to send to the server
413                                 Pair<Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize);
414
415                                 if (sendSlotsReturn.getFirst()) {
416                                         // Did insert into the block chain
417
418                                         // New Key was successfully inserted into the block chain so dont want to insert it again
419                                         newKey = null;
420
421                                         // Remove the aborts and commit parts that were sent from the pending to send queue
422                                         pendingSendArbitrationEntries.removeAll(pendingSendArbitrationEntriesToDelete);
423
424                                         for (Transaction transaction : transactionPartsSent.keySet()) {
425
426                                                 // Update which transactions parts still need to be sent
427                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
428
429                                                 // Add the transaction status to the outstanding list
430                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
431
432                                                 // Update the transaction status
433                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
434
435                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
436                                                 if (transaction.didSendAllParts()) {
437                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
438                                                         pendingTransactionQueue.remove(transaction);
439                                                 }
440                                         }
441                                 } else {
442                                         // Reset which transaction to send
443                                         for (Transaction transaction : transactionPartsSent.keySet()) {
444                                                 transaction.resetNextPartToSend();
445
446                                                 // Set the transaction sequence number back to nothing
447                                                 if (!transaction.didSendAPartToServer()) {
448                                                         transaction.setSequenceNumber(-1);
449                                                 }
450                                         }
451                                 }
452
453                                 // Clear the sent data in preparation for next send
454                                 pendingSendArbitrationEntriesToDelete.clear();
455                                 transactionPartsSent.clear();
456
457                                 if (sendSlotsReturn.getSecond().length != 0) {
458                                         // insert into the local block chain
459                                         validateAndUpdate(sendSlotsReturn.getSecond(), true);
460                                 }
461                         }
462                 } catch (ServerException e) {
463
464                         if (e.getType() != ServerException.TypeInputTimeout) {
465                                 e.printStackTrace();
466
467                                 // Nothing was able to be sent to the server so just clear these data structures
468                                 for (Transaction transaction : transactionPartsSent.keySet()) {
469                                         // Set the transaction sequence number back to nothing
470                                         if (!transaction.didSendAPartToServer()) {
471                                                 transaction.setSequenceNumber(-1);
472                                         }
473                                 }
474
475                                 pendingSendArbitrationEntriesToDelete.clear();
476                                 transactionPartsSent.clear();
477                         } else {
478                                 // There was a partial send to the server
479                         }
480                 }
481
482                 return newKey == null;
483         }
484
485         private Pair<Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize)  throws ServerException {
486
487                 boolean inserted = true;
488
489                 Slot[] array = cloud.putSlot(slot, newSize);
490                 if (array == null) {
491                         array = new Slot[] {slot};
492                         rejectedSlotList.clear();
493                 }       else {
494                         if (array.length == 0) {
495                                 throw new Error("Server Error: Did not send any slots");
496                         }
497                         rejectedSlotList.add(slot.getSequenceNumber());
498                         inserted = false;
499                 }
500
501                 return new Pair<Boolean, Slot[]>(inserted, array);
502         }
503
504         /**
505          * Returns false if a resize was needed
506          */
507         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
508                 int newSize = 0;
509                 if (liveSlotCount > bufferResizeThreshold) {
510                         resize = true; //Resize is forced
511                 }
512
513                 if (resize) {
514                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
515                         TableStatus status = new TableStatus(slot, newSize);
516                         slot.addEntry(status);
517                 }
518
519                 // Fill with rejected slots first before doing anything else
520                 doRejectedMessages(slot);
521
522                 // Do mandatory rescue of entries
523                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
524
525                 // Extract working variables
526                 boolean needsResize = mandatoryRescueReturn.getFirst();
527                 boolean seenLiveSlot = mandatoryRescueReturn.getSecond();
528                 long currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
529
530                 if (needsResize && !resize) {
531                         // We need to resize but we are not resizing so return false
532                         return new ThreeTuple<Boolean, Integer, Boolean>(true, null, null);
533                 }
534
535                 boolean inserted = false;
536                 if (newKeyEntry != null) {
537                         newKeyEntry.setSlot(slot);
538                         if (slot.hasSpace(newKeyEntry)) {
539                                 slot.addEntry(newKeyEntry);
540                                 inserted = true;
541                         }
542                 }
543
544                 // Clear the transactions, aborts and commits that were sent previously
545                 transactionPartsSent.clear();
546                 pendingSendArbitrationEntriesToDelete.clear();
547
548                 // Insert pending arbitration data
549                 for (Entry arbitrationData : pendingSendArbitrationEntries) {
550
551                         // If it is an abort then we need to set some information
552                         if (arbitrationData instanceof Abort) {
553                                 ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
554                         }
555
556                         if (!slot.hasSpace(arbitrationData)) {
557                                 // No space so cant do anything else with these data entries
558                                 break;
559                         }
560
561                         // Add to this current slot and add it to entries to delete
562                         slot.addEntry(arbitrationData);
563                         pendingSendArbitrationEntriesToDelete.add(arbitrationData);
564                 }
565
566                 // Insert as many transactions as possible while keeping order
567                 for (Transaction transaction : pendingTransactionQueue) {
568
569                         // Set the transaction sequence number if it has yet to be inserted into the block chain
570                         if (!transaction.didSendAPartToServer()) {
571                                 transaction.setSequenceNumber(slot.getSequenceNumber());
572                         }
573
574                         boolean ranOutOfSpace = false;
575
576                         while (true) {
577                                 TransactionPart part = transaction.getNextPartToSend();
578
579                                 if (part == null) {
580                                         // Ran out of parts to send for this transaction so move on
581                                         break;
582                                 }
583
584                                 if (slot.hasSpace(part)) {
585                                         slot.addEntry(part);
586
587                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
588                                         if (partsSent == null) {
589                                                 partsSent = new ArrayList<Integer>();
590                                                 transactionPartsSent.put(transaction, partsSent);
591                                         }
592
593                                         partsSent.add(part.getPartNumber());
594                                         transactionPartsSent.put(transaction, partsSent);
595
596                                 } else {
597                                         ranOutOfSpace = true;
598                                         break;
599                                 }
600                         }
601
602                         if (ranOutOfSpace) {
603                                 break;
604                         }
605                 }
606
607                 // Fill the remainder of the slot with rescue data
608                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
609
610                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
611         }
612
613         private void doRejectedMessages(Slot s) {
614                 if (! rejectedSlotList.isEmpty()) {
615                         /* TODO: We should avoid generating a rejected message entry if
616                          * there is already a sufficient entry in the queue (e.g.,
617                          * equalsto value of true and same sequence number).  */
618
619                         long old_seqn = rejectedSlotList.firstElement();
620                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
621                                 long new_seqn = rejectedSlotList.lastElement();
622                                 RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
623                                 s.addEntry(rm);
624                         } else {
625                                 long prev_seqn = -1;
626                                 int i = 0;
627                                 /* Go through list of missing messages */
628                                 for (; i < rejectedSlotList.size(); i++) {
629                                         long curr_seqn = rejectedSlotList.get(i);
630                                         Slot s_msg = buffer.getSlot(curr_seqn);
631                                         if (s_msg != null)
632                                                 break;
633                                         prev_seqn = curr_seqn;
634                                 }
635                                 /* Generate rejected message entry for missing messages */
636                                 if (prev_seqn != -1) {
637                                         RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
638                                         s.addEntry(rm);
639                                 }
640                                 /* Generate rejected message entries for present messages */
641                                 for (; i < rejectedSlotList.size(); i++) {
642                                         long curr_seqn = rejectedSlotList.get(i);
643                                         Slot s_msg = buffer.getSlot(curr_seqn);
644                                         long machineid = s_msg.getMachineID();
645                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
646                                         s.addEntry(rm);
647                                 }
648                         }
649                 }
650         }
651
652         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, boolean resize) {
653                 long newestSequenceNumber = buffer.getNewestSeqNum();
654                 long oldestSequenceNumber = buffer.getOldestSeqNum();
655                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
656                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
657                 }
658
659                 long currentSequenceNumber = oldestLiveSlotSequenceNumver;
660                 boolean seenLiveSlot = false;
661                 long firstIfFull = newestSequenceNumber + 1 - numberOfSlots;    // smallest seq number in the buffer if it is full
662                 long threshold = firstIfFull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
663
664
665                 // Mandatory Rescue
666                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
667                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
668                         // Push slot number forward
669                         if (! seenLiveSlot) {
670                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
671                         }
672
673                         if (!previousSlot.isLive()) {
674                                 continue;
675                         }
676
677                         // We have seen a live slot
678                         seenLiveSlot = true;
679
680                         // Get all the live entries for a slot
681                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
682
683                         // Iterate over all the live entries and try to rescue them
684                         for (Entry liveEntry : liveEntries) {
685                                 if (slot.hasSpace(liveEntry)) {
686
687                                         // Enough space to rescue the entry
688                                         slot.addEntry(liveEntry);
689                                 } else if (currentSequenceNumber == firstIfFull) {
690                                         //if there's no space but the entry is about to fall off the queue
691                                         System.out.println("B"); //?
692                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
693
694                                 }
695                         }
696                 }
697
698                 // Did not resize
699                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
700         }
701
702         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
703                 /* now go through live entries from least to greatest sequence number until
704                  * either all live slots added, or the slot doesn't have enough room
705                  * for SKIP_THRESHOLD consecutive entries*/
706                 int skipcount = 0;
707                 long newestseqnum = buffer.getNewestSeqNum();
708                 search:
709                 for (; seqn <= newestseqnum; seqn++) {
710                         Slot prevslot = buffer.getSlot(seqn);
711                         //Push slot number forward
712                         if (!seenliveslot)
713                                 oldestLiveSlotSequenceNumver = seqn;
714
715                         if (!prevslot.isLive())
716                                 continue;
717                         seenliveslot = true;
718                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
719                         for (Entry liveentry : liveentries) {
720                                 if (s.hasSpace(liveentry))
721                                         s.addEntry(liveentry);
722                                 else {
723                                         skipcount++;
724                                         if (skipcount > SKIP_THRESHOLD)
725                                                 break search;
726                                 }
727                         }
728                 }
729         }
730
731         /**
732          * Checks for malicious activity and updates the local copy of the block chain.
733          */
734         private void validateAndUpdate(Slot[] newSlots, boolean acceptUpdatesToLocal) {
735
736                 // The cloud communication layer has checked slot HMACs already before decoding
737                 if (newSlots.length == 0) {
738                         return;
739                 }
740
741                 // Reset the table status declared sizes
742                 smallestTableStatusSeen = -1;
743                 largestTableStatusSeen = -1;
744
745
746                 // Make sure all slots are newer than the last largest slot this client has seen
747                 long firstSeqNum = newSlots[0].getSequenceNumber();
748                 if (firstSeqNum <= sequenceNumber) {
749                         throw new Error("Server Error: Sent older slots!");
750                 }
751
752                 // Create an object that can access both new slots and slots in our local chain
753                 // without committing slots to our local chain
754                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
755
756                 // Check that the HMAC chain is not broken
757                 checkHMACChain(indexer, newSlots);
758
759                 // Set to keep track of messages from clients
760                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
761
762                 // Process each slots data
763                 for (Slot slot : newSlots) {
764                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
765                 }
766
767                 // If there is a gap, check to see if the server sent us everything.
768                 if (firstSeqNum != (sequenceNumber + 1)) {
769
770                         // Check the size of the slots that were sent down by the server.
771                         // Can only check the size if there was a gap
772                         checkNumSlots(newSlots.length);
773
774                         // Since there was a gap every machine must have pushed a slot or must have
775                         // a last message message.  If not then the server is hiding slots
776                         if (!machineSet.isEmpty()) {
777                                 throw new Error("Missing record for machines: " + machineSet);
778                         }
779                 }
780
781                 // Update the size of our local block chain.
782                 commitNewMaxSize();
783
784                 // Commit new to slots to the local block chain.
785                 for (Slot slot : newSlots) {
786
787                         // Insert this slot into our local block chain copy.
788                         buffer.putSlot(slot);
789
790                         // Keep track of how many slots are currently live (have live data in them).
791                         liveSlotCount++;
792                 }
793
794                 // Get the sequence number of the latest slot in the system
795                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
796
797                 updateLiveStateFromServer();
798         }
799
800         private void updateLiveStateFromServer() {
801                 // Process the new transaction parts
802                 processNewTransactionParts();
803
804                 // Do arbitration on new transactions that were received
805                 arbitrateFromServer();
806
807                 // Update all the committed keys
808                 boolean didCommitOrSpeculate = updateCommittedTable();
809
810                 // Delete the transactions that are now dead
811                 updateLiveTransactionsAndStatus();
812
813                 // Do speculations
814                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
815                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
816         }
817
818         private void updateLiveStateFromLocal() {
819                 // Update all the committed keys
820                 boolean didCommitOrSpeculate = updateCommittedTable();
821
822                 // Delete the transactions that are now dead
823                 updateLiveTransactionsAndStatus();
824
825                 // Do speculations
826                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
827                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
828         }
829
830         /**
831          * Check the size of the block chain to make sure there are enough slots sent back by the server.
832          * This is only called when we have a gap between the slots that we have locally and the slots
833          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
834          * status message
835          */
836         private void checkNumSlots(int numberOfSlots) {
837
838                 // We only have 1 size so we must have this many slots
839                 if (largestTableStatusSeen == smallestTableStatusSeen) {
840                         if (numberOfSlots != smallestTableStatusSeen) {
841                                 throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
842                         }
843                 } else {
844                         // We have more than 1
845                         if (numberOfSlots < smallestTableStatusSeen) {
846                                 throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
847                         }
848                 }
849         }
850
851         /**
852          * Update the size of of the local buffer if it is needed.
853          */
854         private void commitNewMaxSize() {
855
856                 int currMaxSize = 0;
857
858                 if (largestTableStatusSeen == -1) {
859                         // No table status seen so the current max size does not change
860                         currMaxSize = numberOfSlots;
861                 } else {
862                         currMaxSize = largestTableStatusSeen;
863                 }
864
865                 // Resize the local slot buffer
866                 if (numberOfSlots != currMaxSize) {
867                         buffer.resize(currMaxSize);
868                 }
869
870                 // Change the number of local slots to the new size
871                 numberOfSlots = currMaxSize;
872
873                 // Recalculate the resize threshold since the size of the local buffer has changed
874                 setResizeThreshold();
875         }
876
877         /**
878          * Process the new transaction parts from this latest round of slots received from the server
879          */
880         private void processNewTransactionParts() {
881
882                 if (newTransactionParts.size() == 0) {
883                         // Nothing new to process
884                         return;
885                 }
886
887                 // Iterate through all the machine Ids that we received new parts for
888                 for (Long machineId : newTransactionParts.keySet()) {
889                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
890
891                         // Iterate through all the parts for that machine Id
892                         for (Pair<Long, Integer> partId : parts.keySet()) {
893                                 TransactionPart part = parts.get(partId);
894
895                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
896                                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= part.getSequenceNumber())) {
897                                         // Set dead the transaction part
898                                         part.setDead();
899                                         continue;
900                                 }
901
902                                 // Get the transaction object for that sequence number
903                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
904
905                                 if (transaction == null) {
906                                         // This is a new transaction that we dont have so make a new one
907                                         transaction = new Transaction();
908
909                                         // Insert this new transaction into the live tables
910                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
911                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
912                                 }
913
914                                 // Add that part to the transaction
915                                 transaction.addPartDecode(part);
916                         }
917                 }
918
919                 // Clear all the new transaction parts in preparation for the next time the server sends slots
920                 newTransactionParts.clear();
921         }
922
923         public void arbitrateFromServer() {
924
925                 if (liveTransactionBySequenceNumberTable.size() == 0) {
926                         // Nothing to arbitrate on so move on
927                         return;
928                 }
929
930                 // Get the transaction sequence numbers and sort from oldest to newest
931                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
932                 Collections.sort(transactionSequenceNumbers);
933
934                 // Collection of key value pairs that are
935                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
936
937                 // The last transaction arbitrated on
938                 long lastTransactionCommitted = -1;
939
940                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
941                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
942
943                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
944                         if (transaction.getArbitrator() != localMachineId) {
945                                 continue;
946                         }
947
948
949                         if (!transaction.isComplete()) {
950                                 // Will arbitrate in incorrect order if we continue so just break
951                                 // Most likely this
952                                 break;
953                         }
954
955
956                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
957                                 // Guard evaluated as true
958
959                                 // Update the local changes so we can make the commit
960                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
961                                         speculativeTableTmp.put(kv.getKey(), kv);
962                                 }
963
964                                 // Update what the last transaction committed was for use in batch commit
965                                 lastTransactionCommitted = transaction.getSequenceNumber();
966
967                         } else {
968                                 // Guard evaluated was false so create abort
969
970                                 // Create the abort
971                                 Abort newAbort = new Abort(null,
972                                                            transaction.getClientLocalSequenceNumber(),
973                                                            transaction.getSequenceNumber(),
974                                                            transaction.getMachineId(),
975                                                            transaction.getArbitrator());
976
977                                 // Add the abort to the queue of aborts to send out
978                                 pendingSendArbitrationEntries.add(newAbort);
979
980                                 // Insert the abort so we can process
981                                 processEntry(newAbort);
982                         }
983                 }
984
985                 // If there is something to commit
986                 if (speculativeTableTmp.size() != 0) {
987
988                         // Create the commit and increment the commit sequence number
989                         Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, lastTransactionCommitted);
990                         localCommitSequenceNumber++;
991
992                         // Add all the new keys to the commit
993                         for (KeyValue kv : speculativeTableTmp.values()) {
994                                 newCommit.addKV(kv);
995                         }
996
997                         // create the commit parts
998                         newCommit.createCommitParts();
999
1000                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1001                         pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
1002
1003                         // Insert the commit so we can process it
1004                         for (CommitPart commitPart : newCommit.getParts().values()) {
1005                                 processEntry(commitPart);
1006                         }
1007                 }
1008         }
1009
1010         public void arbitrateOnLocalTransaction(Transaction transaction) {
1011
1012                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1013                 if (transaction.getArbitrator() != localMachineId) {
1014                         return;
1015                 }
1016
1017                 if (!transaction.isComplete()) {
1018                         // Will arbitrate in incorrect order if we continue so just break
1019                         // Most likely this
1020                         return;
1021                 }
1022
1023                 if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
1024                         // Guard evaluated as true
1025
1026                         // Create the commit and increment the commit sequence number
1027                         Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, -1);
1028                         localCommitSequenceNumber++;
1029
1030                         // Update the local changes so we can make the commit
1031                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1032                                 newCommit.addKV(kv);
1033                         }
1034
1035                         // create the commit parts
1036                         newCommit.createCommitParts();
1037
1038                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1039                         pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
1040
1041                         // Insert the commit so we can process it
1042                         for (CommitPart commitPart : newCommit.getParts().values()) {
1043                                 processEntry(commitPart);
1044                         }
1045
1046                         TransactionStatus status = transaction.getTransactionStatus();
1047                         status.setStatus(TransactionStatus.StatusCommitted);
1048
1049                 } else {
1050                         // Guard evaluated was false so create abort
1051                         TransactionStatus status = transaction.getTransactionStatus();
1052                         status.setStatus(TransactionStatus.StatusAborted);
1053                 }
1054         }
1055
1056         /**
1057          * Update all the commits and the committed tables, sets dead the dead transactions
1058          */
1059         private boolean updateCommittedTable() {
1060
1061                 if (newCommitParts.size() == 0) {
1062                         // Nothing new to process
1063                         return false;
1064                 }
1065
1066                 // Iterate through all the machine Ids that we received new parts for
1067                 for (Long machineId : newCommitParts.keySet()) {
1068                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
1069
1070                         // Iterate through all the parts for that machine Id
1071                         for (Pair<Long, Integer> partId : parts.keySet()) {
1072                                 CommitPart part = parts.get(partId);
1073
1074                                 // Get the transaction object for that sequence number
1075                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1076
1077                                 if (commitForClientTable == null) {
1078                                         // This is the first commit from this device
1079                                         commitForClientTable = new HashMap<Long, Commit>();
1080                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1081                                 }
1082
1083                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
1084
1085                                 if (commit == null) {
1086                                         // This is a new commit that we dont have so make a new one
1087                                         commit = new Commit();
1088
1089                                         // Insert this new commit into the live tables
1090                                         commitForClientTable.put(part.getSequenceNumber(), commit);
1091                                 }
1092
1093                                 // Add that part to the commit
1094                                 commit.addPartDecode(part);
1095                         }
1096                 }
1097
1098                 // Clear all the new commits parts in preparation for the next time the server sends slots
1099                 newCommitParts.clear();
1100
1101                 // If we process a new commit keep track of it for future use
1102                 boolean didProcessANewCommit = false;
1103
1104                 // Process the commits one by one
1105                 for (Long arbitratorId : liveCommitsTable.keySet()) {
1106
1107                         // Get all the commits for a specific arbitrator
1108                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
1109
1110                         // Sort the commits in order
1111                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1112                         Collections.sort(commitSequenceNumbers);
1113
1114                         // Go through each new commit one by one
1115                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
1116                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
1117                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
1118
1119                                 // Special processing if a commit is not complete
1120                                 if (!commit.isComplete()) {
1121                                         if (i == (commitSequenceNumbers.size() - 1)) {
1122                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1123                                                 break;
1124                                         } else {
1125                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
1126                                                 // Delete it and move on
1127                                                 commit.setDead();
1128                                                 commitForClientTable.remove(commit.getSequenceNumber());
1129                                                 continue;
1130                                         }
1131                                 }
1132
1133                                 // Get the last commit seen from this arbitrator
1134                                 long lastCommitSeenSequenceNumber = -1;
1135                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
1136                                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId());
1137                                 }
1138
1139
1140
1141
1142
1143
1144                                 // We have already seen this commit before so need to do the full processing on this commit
1145                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1146
1147                                         // Update the last transaction that was updated if we can
1148                                         if (commit.getTransactionSequenceNumber() != -1) {
1149                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1150
1151                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1152                                                 if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1153                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1154                                                 }
1155                                         }
1156
1157                                         continue;
1158                                 }
1159
1160
1161                                 // If we got here then this is a brand new commit and needs full processing
1162
1163                                 // Get what commits should be edited, these are the commits that have live values for their keys
1164                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
1165                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1166                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
1167                                 }
1168                                 commitsToEdit.remove(null); // remove null since it could be in this set
1169
1170                                 // Update each previous commit that needs to be updated
1171                                 for (Commit previousCommit : commitsToEdit) {
1172
1173                                         // Only bother with live commits (TODO: Maybe remove this check)
1174                                         if (previousCommit.isLive()) {
1175
1176                                                 // Update which keys in the old commits are still live
1177                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1178                                                         previousCommit.invalidateKey(kv.getKey());
1179                                                 }
1180
1181                                                 // if the commit is now dead then remove it
1182                                                 if (!previousCommit.isLive()) {
1183                                                         commitForClientTable.remove(previousCommit);
1184                                                 }
1185                                         }
1186                                 }
1187
1188                                 // Update the last seen sequence number from this arbitrator
1189                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
1190
1191                                 // Update the last transaction that was updated if we can
1192                                 if (commit.getTransactionSequenceNumber() != -1) {
1193                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1194
1195                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1196                                         if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1197                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1198                                         }
1199                                 }
1200
1201                                 // We processed a new commit that we havent seen before
1202                                 didProcessANewCommit = true;
1203
1204                                 // Update the committed table of keys and which commit is using which key
1205                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
1206                                         committedKeyValueTable.put(kv.getKey(), kv);
1207                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
1208                                 }
1209                         }
1210                 }
1211
1212                 return didProcessANewCommit;
1213         }
1214
1215         /**
1216          * Create the speculative table from transactions that are still live and have come from the cloud
1217          */
1218         private boolean updateSpeculativeTable(boolean didProcessNewCommits) {
1219                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
1220                         // There is nothing to speculate on
1221                         return false;
1222                 }
1223
1224                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
1225                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1226                 Collections.sort(transactionSequenceNumbersSorted);
1227
1228                 boolean hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
1229
1230
1231                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
1232                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
1233                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
1234
1235                         // Start from scratch
1236                         speculatedKeyValueTable.clear();
1237                         lastTransactionSequenceNumberSpeculatedOn = -1;
1238                         oldestTransactionSequenceNumberSpeculatedOn = -1;
1239
1240                 }
1241
1242                 // Remember the front of the transaction list
1243                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
1244
1245                 // Find where to start arbitration from
1246                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
1247
1248                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
1249                         // Make sure we are not out of bounds
1250                         return false; // did not speculate
1251                 }
1252
1253                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
1254                 boolean didSkip = true;
1255
1256                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
1257                         long transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
1258                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1259
1260                         if (!transaction.isComplete()) {
1261                                 // If there is an incomplete transaction then there is nothing we can do
1262                                 // add this transactions arbitrator to the list of arbitrators we should ignore
1263                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
1264                                 didSkip = true;
1265                                 continue;
1266                         }
1267
1268                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
1269                                 continue;
1270                         }
1271
1272                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
1273
1274                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, null)) {
1275                                 // Guard evaluated to true so update the speculative table
1276                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1277                                         speculatedKeyValueTable.put(kv.getKey(), kv);
1278                                 }
1279                         }
1280                 }
1281
1282                 if (didSkip) {
1283                         // Since there was a skip we need to redo the speculation next time around
1284                         lastTransactionSequenceNumberSpeculatedOn = -1;
1285                         oldestTransactionSequenceNumberSpeculatedOn = -1;
1286                 }
1287
1288                 // We did some speculation
1289                 return true;
1290         }
1291
1292         /**
1293          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
1294          */
1295         private void updatePendingTransactionSpeculativeTable(boolean didProcessNewCommitsOrSpeculate) {
1296                 if (pendingTransactionQueue.size() == 0) {
1297                         // There is nothing to speculate on
1298                         return;
1299                 }
1300
1301
1302                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
1303                         // need to reset on the pending speculation
1304                         lastPendingTransactionSpeculatedOn = null;
1305                         firstPendingTransaction = pendingTransactionQueue.get(0);
1306                         pendingTransactionSpeculatedKeyValueTable.clear();
1307                 }
1308
1309                 // Find where to start arbitration from
1310                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
1311
1312                 if (startIndex >= pendingTransactionQueue.size()) {
1313                         // Make sure we are not out of bounds
1314                         return;
1315                 }
1316
1317                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
1318                         Transaction transaction = pendingTransactionQueue.get(i);
1319
1320                         lastPendingTransactionSpeculatedOn = transaction;
1321
1322                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
1323                                 // Guard evaluated to true so update the speculative table
1324                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1325                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
1326                                 }
1327                         }
1328                 }
1329         }
1330
1331         /**
1332          * Set dead and remove from the live transaction tables the transactions that are dead
1333          */
1334         private void updateLiveTransactionsAndStatus() {
1335
1336                 // Go through each of the transactions
1337                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
1338                         Transaction transaction = iter.next().getValue();
1339
1340                         // Check if the transaction is dead
1341                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
1342                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
1343
1344                                 // Set dead the transaction
1345                                 transaction.setDead();
1346
1347                                 // Remove the transaction from the live table
1348                                 iter.remove();
1349                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
1350                         }
1351                 }
1352
1353                 // Go through each of the transactions
1354                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
1355                         TransactionStatus status = iter.next().getValue();
1356
1357                         // Check if the transaction is dead
1358                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
1359                         if ((lastTransactionNumber != null) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
1360
1361                                 // Set committed
1362                                 status.setStatus(TransactionStatus.StatusCommitted);
1363
1364                                 // Remove
1365                                 iter.remove();
1366                         }
1367                 }
1368         }
1369
1370
1371         /**
1372          * Process this slot, entry by entry.  Also update the latest message sent by slot
1373          */
1374         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
1375
1376                 // Update the last message seen
1377                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
1378
1379                 // Process each entry in the slot
1380                 for (Entry entry : slot.getEntries()) {
1381                         switch (entry.getType()) {
1382
1383                         case Entry.TypeCommitPart:
1384                                 processEntry((CommitPart)entry);
1385                                 break;
1386
1387                         case Entry.TypeAbort:
1388                                 processEntry((Abort)entry);
1389                                 break;
1390
1391                         case Entry.TypeTransactionPart:
1392                                 processEntry((TransactionPart)entry);
1393                                 break;
1394
1395                         case Entry.TypeNewKey:
1396                                 processEntry((NewKey)entry);
1397                                 break;
1398
1399                         case Entry.TypeLastMessage:
1400                                 processEntry((LastMessage)entry, machineSet);
1401                                 break;
1402
1403                         case Entry.TypeRejectedMessage:
1404                                 processEntry((RejectedMessage)entry, indexer);
1405                                 break;
1406
1407                         case Entry.TypeTableStatus:
1408                                 processEntry((TableStatus)entry);
1409                                 break;
1410
1411                         default:
1412                                 throw new Error("Unrecognized type: " + entry.getType());
1413                         }
1414                 }
1415         }
1416
1417         /**
1418          * Update the last message that was sent for a machine Id
1419          */
1420         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1421                 // Update what the last message received by a machine was
1422                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1423         }
1424
1425         /**
1426          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
1427          */
1428         private void processEntry(NewKey entry) {
1429
1430                 // Update the arbitrator table with the new key information
1431                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1432
1433                 // Update what the latest live new key is
1434                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
1435                 if (oldNewKey != null) {
1436                         // Delete the old new key messages
1437                         oldNewKey.setDead();
1438                 }
1439         }
1440
1441         /**
1442          * Process new table status entries and set dead the old ones as new ones come in.
1443          * keeps track of the largest and smallest table status seen in this current round
1444          * of updating the local copy of the block chain
1445          */
1446         private void processEntry(TableStatus entry) {
1447                 int newNumSlots = entry.getMaxSlots();
1448
1449                 if (liveTableStatus != null) {
1450                         // We have a larger table status so the old table status is no longer alive
1451                         liveTableStatus.setDead();
1452                 }
1453
1454                 // Make this new table status the latest alive table status
1455                 liveTableStatus = entry;
1456
1457                 if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
1458                         smallestTableStatusSeen = newNumSlots;
1459                 }
1460
1461                 if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
1462                         largestTableStatusSeen = newNumSlots;
1463                 }
1464         }
1465
1466         /**
1467          * Check old messages to see if there is a block chain violation. Also
1468          */
1469         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1470                 long oldSeqNum = entry.getOldSeqNum();
1471                 long newSeqNum = entry.getNewSeqNum();
1472                 boolean isequal = entry.getEqual();
1473                 long machineId = entry.getMachineID();
1474
1475
1476                 // Check if we have messages that were supposed to be rejected in our local block chain
1477                 for (long seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
1478
1479                         // Get the slot
1480                         Slot slot = indexer.getSlot(seqNum);
1481
1482                         if (slot != null) {
1483                                 // If we have this slot make sure that it was not supposed to be a rejected slot
1484
1485                                 long slotMachineId = slot.getMachineID();
1486                                 if (isequal != (slotMachineId == machineId)) {
1487                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
1488                                 }
1489                         }
1490                 }
1491
1492
1493                 // Create a list of clients to watch until they see this rejected message entry.
1494                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
1495                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
1496
1497                         // Machine ID for the last message entry
1498                         long lastMessageEntryMachineId = lastMessageEntry.getKey();
1499
1500                         // We've seen it, don't need to continue to watch.  Our next
1501                         // message will implicitly acknowledge it.
1502                         if (lastMessageEntryMachineId == localMachineId) {
1503                                 continue;
1504                         }
1505
1506                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
1507                         long entrySequenceNumber = lastMessageValue.getFirst();
1508
1509                         if (entrySequenceNumber < newSeqNum) {
1510
1511                                 // Add this rejected message to the set of messages that this machine ID did not see yet
1512                                 addWatchList(lastMessageEntryMachineId, entry);
1513
1514                                 // This client did not see this rejected message yet so add it to the watch set to monitor
1515                                 deviceWatchSet.add(lastMessageEntryMachineId);
1516                         }
1517                 }
1518
1519                 if (deviceWatchSet.isEmpty()) {
1520                         // This rejected message has been seen by all the clients so
1521                         entry.setDead();
1522                 } else {
1523                         // We need to watch this rejected message
1524                         entry.setWatchSet(deviceWatchSet);
1525                 }
1526         }
1527
1528         /**
1529          * Check if this abort is live, if not then save it so we can kill it later.
1530          * update the last transaction number that was arbitrated on.
1531          */
1532         private void processEntry(Abort entry) {
1533
1534                 // Abort has not been seen by the client it is for yet so we need to keep track of it
1535                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
1536                 if (previouslySeenAbort != null) {
1537                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
1538                 }
1539
1540                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
1541
1542                         // The machine already saw this so it is dead
1543                         entry.setDead();
1544                         liveAbortTable.remove(entry);
1545                         return;
1546                 }
1547
1548
1549                 // update the transaction status
1550                 TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
1551                 if (status != null) {
1552                         status.setStatus(TransactionStatus.StatusAborted);
1553                 }
1554
1555
1556                 // Set dead a transaction if we can
1557                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
1558                 if (transactionToSetDead != null) {
1559                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
1560                 }
1561
1562                 // Update the last transaction sequence number that the arbitrator arbitrated on
1563                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
1564                 if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
1565
1566                         // Is a valid one
1567                         if (entry.getTransactionSequenceNumber() != -1) {
1568                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
1569                         }
1570                 }
1571         }
1572
1573         /**
1574          * Set dead the transaction part if that transaction is dead and keep track of all new parts
1575          */
1576         private void processEntry(TransactionPart entry) {
1577                 // Check if we have already seen this transaction and set it dead OR if it is not alive
1578                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
1579                 if ((lastTransactionNumber != null) && (lastTransactionNumber >= entry.getSequenceNumber())) {
1580                         // This transaction is dead, it was already committed or aborted
1581                         entry.setDead();
1582                         return;
1583                 }
1584
1585                 // This part is still alive
1586                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
1587
1588                 if (transactionPart == null) {
1589                         // Dont have a table for this machine Id yet so make one
1590                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
1591                         newTransactionParts.put(entry.getMachineId(), transactionPart);
1592                 }
1593
1594                 // Update the part and set dead ones we have already seen (got a rescued version)
1595                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
1596                 if (previouslySeenPart != null) {
1597                         previouslySeenPart.setDead();
1598                 }
1599         }
1600
1601         /**
1602          * Process new commit entries and save them for future use.  Delete duplicates
1603          */
1604         private void processEntry(CommitPart entry) {
1605                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
1606
1607                 if (commitPart == null) {
1608                         // Dont have a table for this machine Id yet so make one
1609                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
1610                         newCommitParts.put(entry.getMachineId(), commitPart);
1611                 }
1612
1613                 // Update the part and set dead ones we have already seen (got a rescued version)
1614                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
1615                 if (previouslySeenPart != null) {
1616                         previouslySeenPart.setDead();
1617                 }
1618         }
1619
1620         /**
1621          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
1622          * Updates the live aborts, removes those that are dead and sets them dead.
1623          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
1624          * other clients have not had a rollback on the last message.
1625          */
1626         private void updateLastMessage(long machineId, long seqNum, Liveness liveness, boolean acceptUpdatesToLocal, HashSet<Long> machineSet) {
1627
1628                 // We have seen this machine ID
1629                 machineSet.remove(machineId);
1630
1631                 // Get the set of rejected messages that this machine Id is has not seen yet
1632                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
1633
1634                 // If there is a rejected message that this machine Id has not seen yet
1635                 if (watchset != null) {
1636
1637                         // Go through each rejected message that this machine Id has not seen yet
1638                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1639
1640                                 RejectedMessage rm = rmit.next();
1641
1642                                 // If this machine Id has seen this rejected message...
1643                                 if (rm.getNewSeqNum() <= seqNum) {
1644
1645                                         // Remove it from our watchlist
1646                                         rmit.remove();
1647
1648                                         // Decrement machines that need to see this notification
1649                                         rm.removeWatcher(machineId);
1650                                 }
1651                         }
1652                 }
1653
1654                 // Set dead the abort
1655                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
1656                         Abort abort = i.next().getValue();
1657
1658                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
1659                                 abort.setDead();
1660                                 i.remove();
1661                         }
1662                 }
1663
1664
1665
1666                 if (machineId == localMachineId) {
1667                         // Our own messages are immediately dead.
1668                         if (liveness instanceof LastMessage) {
1669                                 ((LastMessage)liveness).setDead();
1670                         } else if (liveness instanceof Slot) {
1671                                 ((Slot)liveness).setDead();
1672                         } else {
1673                                 throw new Error("Unrecognized type");
1674                         }
1675                 }
1676
1677                 // Get the old last message for this device
1678                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
1679                 if (lastMessageEntry == null) {
1680                         // If no last message then there is nothing else to process
1681                         return;
1682                 }
1683
1684                 long lastMessageSeqNum = lastMessageEntry.getFirst();
1685                 Liveness lastEntry = lastMessageEntry.getSecond();
1686
1687                 // If it is not our machine Id since we already set ours to dead
1688                 if (machineId != localMachineId) {
1689                         if (lastEntry instanceof LastMessage) {
1690                                 ((LastMessage)lastEntry).setDead();
1691                         } else if (lastEntry instanceof Slot) {
1692                                 ((Slot)lastEntry).setDead();
1693                         } else {
1694                                 throw new Error("Unrecognized type");
1695                         }
1696                 }
1697
1698                 // Make sure the server is not playing any games
1699                 if (machineId == localMachineId) {
1700
1701                         // We were not making any updates and we had a machine mismatch
1702                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
1703                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  seqNum + " got: " + lastMessageSeqNum);
1704                         }
1705                 } else {
1706                         if (lastMessageSeqNum > seqNum) {
1707                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1708                         }
1709                 }
1710         }
1711
1712         /**
1713          * Add a rejected message entry to the watch set to keep track of which clients have seen that
1714          * rejected message entry and which have not.
1715          */
1716         private void addWatchList(long machineId, RejectedMessage entry) {
1717                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
1718                 if (entries == null) {
1719                         // There is no set for this machine ID yet so create one
1720                         entries = new HashSet<RejectedMessage>();
1721                         rejectedMessageWatchListTable.put(machineId, entries);
1722                 }
1723                 entries.add(entry);
1724         }
1725
1726         /**
1727          * Check if the HMAC chain is not violated
1728          */
1729         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
1730                 for (int i = 0; i < newSlots.length; i++) {
1731                         Slot currSlot = newSlots[i];
1732                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
1733                         if (prevSlot != null &&
1734                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
1735                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
1736                 }
1737         }
1738 }