more edits
[iotcloud.git] / version2 / src / C / Table.h
1
2
3 /**
4  * IoTTable data structure.  Provides client interface.
5  * @author Brian Demsky
6  * @version 1.0
7  */
8
9 final public class Table {
10
11         /* Constants */
12         static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
13         static final int SKIP_THRESHOLD = 10;
14         static final double RESIZE_MULTIPLE = 1.2;
15         static final double RESIZE_THRESHOLD = 0.75;
16         static final int REJECTED_THRESHOLD = 5;
17
18         /* Helper Objects */
19         private SlotBuffer buffer = NULL;
20         private CloudComm cloud = NULL;
21         private Random random = NULL;
22         private TableStatus liveTableStatus = NULL;
23         private PendingTransaction pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
24         private Transaction lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
25         private Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
26
27         /* Variables */
28         private int numberOfSlots = 0;  // Number of slots stored in buffer
29         private int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
30         private int64_t liveSlotCount = 0; // Number of currently live slots
31         private int64_t oldestLiveSlotSequenceNumver = 0;       // Smallest sequence number of the slot with a live entry
32         private int64_t localMachineId = 0; // Machine ID of this client device
33         private int64_t sequenceNumber = 0; // Largest sequence number a client has received
34         private int64_t localSequenceNumber = 0;
35
36         // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
37         // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
38         private int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
39         private int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
40         private int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
41         private int64_t localArbitrationSequenceNumber = 0;
42         private bool hadPartialSendToServer = false;
43         private bool attemptedToSendToServer = false;
44         private int64_t expectedsize;
45         private bool didFindTableStatus = false;
46         private int64_t currMaxSize = 0;
47
48         private Slot lastSlotAttemptedToSend = NULL;
49         private bool lastIsNewKey = false;
50         private int lastNewSize = 0;
51         private Map<Transaction, List<Integer>> lastTransactionPartsSent = NULL;
52         private List<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
53         private NewKey lastNewKey = NULL;
54
55
56         /* Data Structures  */
57         private Map<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
58         private Map<IoTString, KeyValue> speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
59         private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
60         private Map<IoTString, NewKey> liveNewKeyTable = NULL; // Table of live new keys
61         private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
62         private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
63         private Map<IoTString, Long> arbitratorTable = NULL; // Table of keys and their arbitrators
64         private Map<Pair<Long, Long>, Abort> liveAbortTable = NULL; // Table live abort messages
65         private Map<Long, Map<Pair<Long, Integer>, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
66         private Map<Long, Map<Pair<Long, Integer>, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
67         private Map<Long, Long> lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
68         private Map<Long, Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
69         private Map<Pair<Long, Long>, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
70         private Map<Long, Map<Long, Commit>> liveCommitsTable = NULL;
71         private Map<IoTString, Commit> liveCommitsByKeyTable = NULL;
72         private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
73         private Vector<Long> rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server
74         private List<Transaction> pendingTransactionQueue = NULL;
75         private List<ArbitrationRound> pendingSendArbitrationRounds = NULL;
76         private List<Entry> pendingSendArbitrationEntriesToDelete = NULL;
77         private Map<Transaction, List<Integer>> transactionPartsSent = NULL;
78         private Map<Long, TransactionStatus> outstandingTransactionStatus = NULL;
79         private Map<Long, Abort> liveAbortsGeneratedByLocal = NULL;
80         private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = NULL;
81         private Map<Long, Pair<String, Integer>> localCommunicationTable = NULL;
82         private Map<Long, Long> lastTransactionSeenFromMachineFromServer = NULL;
83         private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
84
85
86         public Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
87                 localMachineId = _localMachineId;
88                 cloud = new CloudComm(this, baseurl, password, listeningPort);
89
90                 init();
91         }
92
93         public Table(CloudComm _cloud, int64_t _localMachineId) {
94                 localMachineId = _localMachineId;
95                 cloud = _cloud;
96
97                 init();
98         }
99
100         /**
101          * Init all the stuff needed for for table usage
102          */
103         private void init() {
104
105                 // Init helper objects
106                 random = new Random();
107                 buffer = new SlotBuffer();
108
109                 // Set Variables
110                 oldestLiveSlotSequenceNumver = 1;
111
112                 // init data structs
113                 committedKeyValueTable = new HashMap<IoTString, KeyValue>();
114                 speculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
115                 pendingTransactionSpeculatedKeyValueTable = new HashMap<IoTString, KeyValue>();
116                 liveNewKeyTable = new HashMap<IoTString, NewKey>();
117                 lastMessageTable = new HashMap<Long, Pair<Long, Liveness>>();
118                 rejectedMessageWatchListTable = new HashMap<Long, HashSet<RejectedMessage>>();
119                 arbitratorTable = new HashMap<IoTString, Long>();
120                 liveAbortTable = new HashMap<Pair<Long, Long>, Abort>();
121                 newTransactionParts = new HashMap<Long, Map<Pair<Long, Integer>, TransactionPart>>();
122                 newCommitParts = new HashMap<Long, Map<Pair<Long, Integer>, CommitPart>>();
123                 lastArbitratedTransactionNumberByArbitratorTable = new HashMap<Long, Long>();
124                 liveTransactionBySequenceNumberTable = new HashMap<Long, Transaction>();
125                 liveTransactionByTransactionIdTable = new HashMap<Pair<Long, Long>, Transaction>();
126                 liveCommitsTable = new HashMap<Long, Map<Long, Commit>>();
127                 liveCommitsByKeyTable = new HashMap<IoTString, Commit>();
128                 lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
129                 rejectedSlotList = new Vector<Long>();
130                 pendingTransactionQueue = new ArrayList<Transaction>();
131                 pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
132                 transactionPartsSent = new HashMap<Transaction, List<Integer>>();
133                 outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
134                 liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
135                 offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
136                 localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
137                 lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
138                 pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
139                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
140
141
142                 // Other init stuff
143                 numberOfSlots = buffer.capacity();
144                 setResizeThreshold();
145         }
146
147         // TODO: delete method
148         public synchronized void printSlots() {
149                 int64_t o = buffer.getOldestSeqNum();
150                 int64_t n = buffer.getNewestSeqNum();
151
152                 int[] types = new int[10];
153
154                 int num = 0;
155
156                 int livec = 0;
157                 int deadc = 0;
158
159                 int casdasd = 0;
160
161                 int liveslo = 0;
162
163                 for (int64_t i = o; i < (n + 1); i++) {
164                         Slot s = buffer.getSlot(i);
165
166
167                         if (s.isLive()) {
168                                 liveslo++;
169                         }
170
171                         Vector<Entry> entries = s.getEntries();
172
173                         for (Entry e : entries) {
174                                 if (e.isLive()) {
175                                         int type = e.getType();
176
177
178                                         if (type == 6) {
179                                                 RejectedMessage rej = (RejectedMessage)e;
180                                                 casdasd++;
181
182                                                 System.out.println(rej.getMachineID());
183                                         }
184
185
186                                         types[type] = types[type] + 1;
187                                         num++;
188                                         livec++;
189                                 } else {
190                                         deadc++;
191                                 }
192                         }
193                 }
194
195                 for (int i = 0; i < 10; i++) {
196                         System.out.println(i + "    " + types[i]);
197                 }
198                 System.out.println("Live count:   " + livec);
199                 System.out.println("Live Slot count:   " + liveslo);
200
201                 System.out.println("Dead count:   " + deadc);
202                 System.out.println("Old:   " + o);
203                 System.out.println("New:   " + n);
204                 System.out.println("Size:   " + buffer.size());
205                 // System.out.println("Commits:   " + liveCommitsTable.size());
206                 System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
207                 System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
208
209                 for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
210                         System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
211                 }
212
213
214                 for (Long a : liveCommitsTable.keySet()) {
215                         for (Long b : liveCommitsTable.get(a).keySet()) {
216                                 for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
217                                         System.out.print(kv + " ");
218                                 }
219                                 System.out.print("|| ");
220                         }
221                         System.out.println();
222                 }
223
224         }
225
226         /**
227          * Initialize the table by inserting a table status as the first entry into the table status
228          * also initialize the crypto stuff.
229          */
230         public synchronized void initTable() throws ServerException {
231                 cloud.initSecurity();
232
233                 // Create the first insertion into the block chain which is the table status
234                 Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
235                 localSequenceNumber++;
236                 TableStatus status = new TableStatus(s, numberOfSlots);
237                 s.addEntry(status);
238                 Slot[] array = cloud.putSlot(s, numberOfSlots);
239
240                 if (array == NULL) {
241                         array = new Slot[] {s};
242                         // update local block chain
243                         validateAndUpdate(array, true);
244                 } else if (array.length == 1) {
245                         // in case we did push the slot BUT we failed to init it
246                         validateAndUpdate(array, true);
247                 } else {
248                         throw new Error("Error on initialization");
249                 }
250         }
251
252         /**
253          * Rebuild the table from scratch by pulling the latest block chain from the server.
254          */
255         public synchronized void rebuild() throws ServerException {
256                 // Just pull the latest slots from the server
257                 Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
258                 validateAndUpdate(newslots, true);
259                 sendToServer(NULL);
260                 updateLiveTransactionsAndStatus();
261
262         }
263
264         // public String toString() {
265         //      String retString = " Committed Table: \n";
266         //      retString += "---------------------------\n";
267         //      retString += commitedTable.toString();
268
269         //      retString += "\n\n";
270
271         //      retString += " Speculative Table: \n";
272         //      retString += "---------------------------\n";
273         //      retString += speculativeTable.toString();
274
275         //      return retString;
276         // }
277
278         public synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
279                 localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
280         }
281
282         public synchronized Long getArbitrator(IoTString key) {
283                 return arbitratorTable.get(key);
284         }
285
286         public synchronized void close() {
287                 cloud.close();
288         }
289
290         public synchronized IoTString getCommitted(IoTString key)  {
291                 KeyValue kv = committedKeyValueTable.get(key);
292
293                 if (kv != NULL) {
294                         return kv.getValue();
295                 } else {
296                         return NULL;
297                 }
298         }
299
300         public synchronized IoTString getSpeculative(IoTString key) {
301                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
302
303                 if (kv == NULL) {
304                         kv = speculatedKeyValueTable.get(key);
305                 }
306
307                 if (kv == NULL) {
308                         kv = committedKeyValueTable.get(key);
309                 }
310
311                 if (kv != NULL) {
312                         return kv.getValue();
313                 } else {
314                         return NULL;
315                 }
316         }
317
318         public synchronized IoTString getCommittedAtomic(IoTString key) {
319                 KeyValue kv = committedKeyValueTable.get(key);
320
321                 if (arbitratorTable.get(key) == NULL) {
322                         throw new Error("Key not Found.");
323                 }
324
325                 // Make sure new key value pair matches the current arbitrator
326                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
327                         // TODO: Maybe not throw en error
328                         throw new Error("Not all Key Values Match Arbitrator.");
329                 }
330
331                 if (kv != NULL) {
332                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
333                         return kv.getValue();
334                 } else {
335                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
336                         return NULL;
337                 }
338         }
339
340         public synchronized IoTString getSpeculativeAtomic(IoTString key) {
341                 if (arbitratorTable.get(key) == NULL) {
342                         throw new Error("Key not Found.");
343                 }
344
345                 // Make sure new key value pair matches the current arbitrator
346                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
347                         // TODO: Maybe not throw en error
348                         throw new Error("Not all Key Values Match Arbitrator.");
349                 }
350
351                 KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
352
353                 if (kv == NULL) {
354                         kv = speculatedKeyValueTable.get(key);
355                 }
356
357                 if (kv == NULL) {
358                         kv = committedKeyValueTable.get(key);
359                 }
360
361                 if (kv != NULL) {
362                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
363                         return kv.getValue();
364                 } else {
365                         pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
366                         return NULL;
367                 }
368         }
369
370         public synchronized bool update()  {
371                 try {
372                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
373                         validateAndUpdate(newSlots, false);
374                         sendToServer(NULL);
375
376
377                         updateLiveTransactionsAndStatus();
378
379                         return true;
380                 } catch (Exception e) {
381                         // e.printStackTrace();
382
383                         for (Long m : localCommunicationTable.keySet()) {
384                                 updateFromLocal(m);
385                         }
386                 }
387
388                 return false;
389         }
390
391         public synchronized bool createNewKey(IoTString keyName, int64_t machineId) throws ServerException {
392                 while (true) {
393                         if (arbitratorTable.get(keyName) != NULL) {
394                                 // There is already an arbitrator
395                                 return false;
396                         }
397
398                         NewKey newKey = new NewKey(NULL, keyName, machineId);
399
400                         if (sendToServer(newKey)) {
401                                 // If successfully inserted
402                                 return true;
403                         }
404                 }
405         }
406
407         public synchronized void startTransaction() {
408                 // Create a new transaction, invalidates any old pending transactions.
409                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
410         }
411
412         public synchronized void addKV(IoTString key, IoTString value) {
413
414                 // Make sure it is a valid key
415                 if (arbitratorTable.get(key) == NULL) {
416                         throw new Error("Key not Found.");
417                 }
418
419                 // Make sure new key value pair matches the current arbitrator
420                 if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
421                         // TODO: Maybe not throw en error
422                         throw new Error("Not all Key Values Match Arbitrator.");
423                 }
424
425                 // Add the key value to this transaction
426                 KeyValue kv = new KeyValue(key, value);
427                 pendingTransactionBuilder.addKV(kv);
428         }
429
430         public synchronized TransactionStatus commitTransaction() {
431
432                 if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
433                         // transaction with no updates will have no effect on the system
434                         return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
435                 }
436
437                 // Set the local transaction sequence number and increment
438                 pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
439                 localTransactionSequenceNumber++;
440
441                 // Create the transaction status
442                 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
443
444                 // Create the new transaction
445                 Transaction newTransaction = pendingTransactionBuilder.createTransaction();
446                 newTransaction.setTransactionStatus(transactionStatus);
447
448                 if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
449                         // Add it to the queue and invalidate the builder for safety
450                         pendingTransactionQueue.add(newTransaction);
451                 } else {
452                         arbitrateOnLocalTransaction(newTransaction);
453                         updateLiveStateFromLocal();
454                 }
455
456                 pendingTransactionBuilder = new PendingTransaction(localMachineId);
457
458                 try {
459                         sendToServer(NULL);
460                 } catch (ServerException e) {
461
462                         Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
463                         for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
464                                 Transaction transaction = iter.next();
465
466                                 if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
467                                         // Already contacted this client so ignore all attempts to contact this client
468                                         // to preserve ordering for arbitrator
469                                         continue;
470                                 }
471
472                                 Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
473
474                                 if (sendReturn.getFirst()) {
475                                         // Failed to contact over local
476                                         arbitratorTriedAndFailed.add(transaction.getArbitrator());
477                                 } else {
478                                         // Successful contact or should not contact
479
480                                         if (sendReturn.getSecond()) {
481                                                 // did arbitrate
482                                                 iter.remove();
483                                         }
484                                 }
485                         }
486                 }
487
488                 updateLiveStateFromLocal();
489
490                 return transactionStatus;
491         }
492
493         /**
494          * Get the machine ID for this client
495          */
496         public int64_t getMachineId() {
497                 return localMachineId;
498         }
499
500         /**
501          * Decrement the number of live slots that we currently have
502          */
503         public void decrementLiveCount() {
504                 liveSlotCount--;
505         }
506
507         /**
508          * Recalculate the new resize threshold
509          */
510         private void setResizeThreshold() {
511                 int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
512                 bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
513         }
514
515         public int64_t getLocalSequenceNumber() {
516                 return localSequenceNumber;
517         }
518
519
520         bool lastInsertedNewKey = false;
521
522         private bool sendToServer(NewKey newKey) throws ServerException {
523
524                 bool fromRetry = false;
525
526                 try {
527                         if (hadPartialSendToServer) {
528                                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
529                                 if (newSlots.length == 0) {
530                                         fromRetry = true;
531                                         ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
532
533                                         if (sendSlotsReturn.getFirst()) {
534                                                 if (newKey != NULL) {
535                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
536                                                                 newKey = NULL;
537                                                         }
538                                                 }
539
540                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
541                                                         transaction.resetServerFailure();
542
543                                                         // Update which transactions parts still need to be sent
544                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
545
546                                                         // Add the transaction status to the outstanding list
547                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
548
549                                                         // Update the transaction status
550                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
551
552                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
553                                                         if (transaction.didSendAllParts()) {
554                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
555                                                                 pendingTransactionQueue.remove(transaction);
556                                                         }
557                                                 }
558                                         } else {
559
560                                                 newSlots = sendSlotsReturn.getThird();
561
562                                                 bool isInserted = false;
563                                                 for (Slot s : newSlots) {
564                                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
565                                                                 isInserted = true;
566                                                                 break;
567                                                         }
568                                                 }
569
570                                                 for (Slot s : newSlots) {
571                                                         if (isInserted) {
572                                                                 break;
573                                                         }
574
575                                                         // Process each entry in the slot
576                                                         for (Entry entry : s.getEntries()) {
577
578                                                                 if (entry.getType() == Entry.TypeLastMessage) {
579                                                                         LastMessage lastMessage = (LastMessage)entry;
580                                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
581                                                                                 isInserted = true;
582                                                                                 break;
583                                                                         }
584                                                                 }
585                                                         }
586                                                 }
587
588                                                 if (isInserted) {
589                                                         if (newKey != NULL) {
590                                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
591                                                                         newKey = NULL;
592                                                                 }
593                                                         }
594
595                                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
596                                                                 transaction.resetServerFailure();
597
598                                                                 // Update which transactions parts still need to be sent
599                                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
600
601                                                                 // Add the transaction status to the outstanding list
602                                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
603
604                                                                 // Update the transaction status
605                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
606
607                                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
608                                                                 if (transaction.didSendAllParts()) {
609                                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
610                                                                         pendingTransactionQueue.remove(transaction);
611                                                                 } else {
612                                                                         transaction.resetServerFailure();
613                                                                         // Set the transaction sequence number back to nothing
614                                                                         if (!transaction.didSendAPartToServer()) {
615                                                                                 transaction.setSequenceNumber(-1);
616                                                                         }
617                                                                 }
618                                                         }
619                                                 }
620                                         }
621
622                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
623                                                 transaction.resetServerFailure();
624                                                 // Set the transaction sequence number back to nothing
625                                                 if (!transaction.didSendAPartToServer()) {
626                                                         transaction.setSequenceNumber(-1);
627                                                 }
628                                         }
629
630                                         if (sendSlotsReturn.getThird().length != 0) {
631                                                 // insert into the local block chain
632                                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
633                                         }
634                                         // continue;
635                                 } else {
636                                         bool isInserted = false;
637                                         for (Slot s : newSlots) {
638                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
639                                                         isInserted = true;
640                                                         break;
641                                                 }
642                                         }
643
644                                         for (Slot s : newSlots) {
645                                                 if (isInserted) {
646                                                         break;
647                                                 }
648
649                                                 // Process each entry in the slot
650                                                 for (Entry entry : s.getEntries()) {
651
652                                                         if (entry.getType() == Entry.TypeLastMessage) {
653                                                                 LastMessage lastMessage = (LastMessage)entry;
654                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
655                                                                         isInserted = true;
656                                                                         break;
657                                                                 }
658                                                         }
659                                                 }
660                                         }
661
662                                         if (isInserted) {
663                                                 if (newKey != NULL) {
664                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
665                                                                 newKey = NULL;
666                                                         }
667                                                 }
668
669                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
670                                                         transaction.resetServerFailure();
671
672                                                         // Update which transactions parts still need to be sent
673                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
674
675                                                         // Add the transaction status to the outstanding list
676                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
677
678                                                         // Update the transaction status
679                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
680
681                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
682                                                         if (transaction.didSendAllParts()) {
683                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
684                                                                 pendingTransactionQueue.remove(transaction);
685                                                         } else {
686                                                                 transaction.resetServerFailure();
687                                                                 // Set the transaction sequence number back to nothing
688                                                                 if (!transaction.didSendAPartToServer()) {
689                                                                         transaction.setSequenceNumber(-1);
690                                                                 }
691                                                         }
692                                                 }
693                                         } else {
694                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
695                                                         transaction.resetServerFailure();
696                                                         // Set the transaction sequence number back to nothing
697                                                         if (!transaction.didSendAPartToServer()) {
698                                                                 transaction.setSequenceNumber(-1);
699                                                         }
700                                                 }
701                                         }
702
703                                         // insert into the local block chain
704                                         validateAndUpdate(newSlots, true);
705                                 }
706                         }
707                 } catch (ServerException e) {
708                         throw e;
709                 }
710
711
712
713                 try {
714                         // While we have stuff that needs inserting into the block chain
715                         while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) {
716
717                                 fromRetry = false;
718
719                                 if (hadPartialSendToServer) {
720                                         throw new Error("Should Be error free");
721                                 }
722
723
724
725                                 // If there is a new key with same name then end
726                                 if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) {
727                                         return false;
728                                 }
729
730                                 // Create the slot
731                                 Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
732                                 localSequenceNumber++;
733
734                                 // Try to fill the slot with data
735                                 ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
736                                 bool needsResize = fillSlotsReturn.getFirst();
737                                 int newSize = fillSlotsReturn.getSecond();
738                                 Boolean insertedNewKey = fillSlotsReturn.getThird();
739
740                                 if (needsResize) {
741                                         // Reset which transaction to send
742                                         for (Transaction transaction : transactionPartsSent.keySet()) {
743                                                 transaction.resetNextPartToSend();
744
745                                                 // Set the transaction sequence number back to nothing
746                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
747                                                         transaction.setSequenceNumber(-1);
748                                                 }
749                                         }
750
751                                         // Clear the sent data since we are trying again
752                                         pendingSendArbitrationEntriesToDelete.clear();
753                                         transactionPartsSent.clear();
754
755                                         // We needed a resize so try again
756                                         fillSlot(slot, true, newKey);
757                                 }
758
759                                 lastSlotAttemptedToSend = slot;
760                                 lastIsNewKey = (newKey != NULL);
761                                 lastInsertedNewKey = insertedNewKey;
762                                 lastNewSize = newSize;
763                                 lastNewKey = newKey;
764                                 lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
765                                 lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
766
767
768                                 ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
769
770                                 if (sendSlotsReturn.getFirst()) {
771
772                                         // Did insert into the block chain
773
774                                         if (insertedNewKey) {
775                                                 // This slot was what was inserted not a previous slot
776
777                                                 // New Key was successfully inserted into the block chain so dont want to insert it again
778                                                 newKey = NULL;
779                                         }
780
781                                         // Remove the aborts and commit parts that were sent from the pending to send queue
782                                         for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
783                                                 ArbitrationRound round = iter.next();
784                                                 round.removeParts(pendingSendArbitrationEntriesToDelete);
785
786                                                 if (round.isDoneSending()) {
787                                                         // Sent all the parts
788                                                         iter.remove();
789                                                 }
790                                         }
791
792                                         for (Transaction transaction : transactionPartsSent.keySet()) {
793                                                 transaction.resetServerFailure();
794
795                                                 // Update which transactions parts still need to be sent
796                                                 transaction.removeSentParts(transactionPartsSent.get(transaction));
797
798                                                 // Add the transaction status to the outstanding list
799                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
800
801                                                 // Update the transaction status
802                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
803
804                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
805                                                 if (transaction.didSendAllParts()) {
806                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
807                                                         pendingTransactionQueue.remove(transaction);
808                                                 }
809                                         }
810                                 } else {
811
812                                         // if (!sendSlotsReturn.getSecond()) {
813                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
814                                         //              transaction.resetServerFailure();
815                                         //      }
816                                         // } else {
817                                         //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
818                                         //              transaction.resetServerFailure();
819
820                                         //              // Update which transactions parts still need to be sent
821                                         //              transaction.removeSentParts(transactionPartsSent.get(transaction));
822
823                                         //              // Add the transaction status to the outstanding list
824                                         //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
825
826                                         //              // Update the transaction status
827                                         //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
828
829                                         //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
830                                         //              if (transaction.didSendAllParts()) {
831                                         //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
832                                         //                      pendingTransactionQueue.remove(transaction);
833
834                                         //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
835                                         //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
836                                         //                      }
837                                         //              }
838                                         //      }
839                                         // }
840
841                                         // Reset which transaction to send
842                                         for (Transaction transaction : transactionPartsSent.keySet()) {
843                                                 transaction.resetNextPartToSend();
844                                                 // transaction.resetNextPartToSend();
845
846                                                 // Set the transaction sequence number back to nothing
847                                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
848                                                         transaction.setSequenceNumber(-1);
849                                                 }
850                                         }
851                                 }
852
853                                 // Clear the sent data in preparation for next send
854                                 pendingSendArbitrationEntriesToDelete.clear();
855                                 transactionPartsSent.clear();
856
857                                 if (sendSlotsReturn.getThird().length != 0) {
858                                         // insert into the local block chain
859                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
860                                 }
861                         }
862
863                 } catch (ServerException e) {
864
865                         if (e.getType() != ServerException.TypeInputTimeout) {
866                                 // e.printStackTrace();
867
868                                 // Nothing was able to be sent to the server so just clear these data structures
869                                 for (Transaction transaction : transactionPartsSent.keySet()) {
870                                         transaction.resetNextPartToSend();
871
872                                         // Set the transaction sequence number back to nothing
873                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
874                                                 transaction.setSequenceNumber(-1);
875                                         }
876                                 }
877                         } else {
878                                 // There was a partial send to the server
879                                 hadPartialSendToServer = true;
880
881
882                                 // if (!fromRetry) {
883                                 //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
884                                 //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
885                                 // }
886
887                                 // Nothing was able to be sent to the server so just clear these data structures
888                                 for (Transaction transaction : transactionPartsSent.keySet()) {
889                                         transaction.resetNextPartToSend();
890                                         transaction.setServerFailure();
891                                 }
892                         }
893
894                         pendingSendArbitrationEntriesToDelete.clear();
895                         transactionPartsSent.clear();
896
897                         throw e;
898                 }
899
900                 return newKey == NULL;
901         }
902
903         private synchronized bool updateFromLocal(int64_t machineId) {
904                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
905                 if (localCommunicationInformation == NULL) {
906                         // Cant talk to that device locally so do nothing
907                         return false;
908                 }
909
910                 // Get the size of the send data
911                 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
912
913                 Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
914                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
915                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
916                 }
917
918                 char[] sendData = new char[sendDataSize];
919                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
920
921                 // Encode the data
922                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
923                 bbEncode.putInt(0);
924
925                 // Send by local
926                 char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
927                 localSequenceNumber++;
928
929                 if (returnData == NULL) {
930                         // Could not contact server
931                         return false;
932                 }
933
934                 // Decode the data
935                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
936                 int numberOfEntries = bbDecode.getInt();
937
938                 for (int i = 0; i < numberOfEntries; i++) {
939                         char type = bbDecode.get();
940                         if (type == Entry.TypeAbort) {
941                                 Abort abort = (Abort)Abort.decode(NULL, bbDecode);
942                                 processEntry(abort);
943                         } else if (type == Entry.TypeCommitPart) {
944                                 CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
945                                 processEntry(commitPart);
946                         }
947                 }
948
949                 updateLiveStateFromLocal();
950
951                 return true;
952         }
953
954         private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
955
956                 // Get the devices local communications
957                 Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
958
959                 if (localCommunicationInformation == NULL) {
960                         // Cant talk to that device locally so do nothing
961                         return new Pair<Boolean, Boolean>(true, false);
962                 }
963
964                 // Get the size of the send data
965                 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
966                 for (TransactionPart part : transaction.getParts().values()) {
967                         sendDataSize += part.getSize();
968                 }
969
970                 Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
971                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
972                         lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
973                 }
974
975                 // Make the send data size
976                 char[] sendData = new char[sendDataSize];
977                 ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
978
979                 // Encode the data
980                 bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
981                 bbEncode.putInt(transaction.getParts().size());
982                 for (TransactionPart part : transaction.getParts().values()) {
983                         part.encode(bbEncode);
984                 }
985
986
987                 // Send by local
988                 char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
989                 localSequenceNumber++;
990
991                 if (returnData == NULL) {
992                         // Could not contact server
993                         return new Pair<Boolean, Boolean>(true, false);
994                 }
995
996                 // Decode the data
997                 ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
998                 bool didCommit = bbDecode.get() == 1;
999                 bool couldArbitrate = bbDecode.get() == 1;
1000                 int numberOfEntries = bbDecode.getInt();
1001                 bool foundAbort = false;
1002
1003                 for (int i = 0; i < numberOfEntries; i++) {
1004                         char type = bbDecode.get();
1005                         if (type == Entry.TypeAbort) {
1006                                 Abort abort = (Abort)Abort.decode(NULL, bbDecode);
1007
1008                                 if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
1009                                         foundAbort = true;
1010                                 }
1011
1012                                 processEntry(abort);
1013                         } else if (type == Entry.TypeCommitPart) {
1014                                 CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
1015                                 processEntry(commitPart);
1016                         }
1017                 }
1018
1019                 updateLiveStateFromLocal();
1020
1021                 if (couldArbitrate) {
1022                         TransactionStatus status =  transaction.getTransactionStatus();
1023                         if (didCommit) {
1024                                 status.setStatus(TransactionStatus.StatusCommitted);
1025                         } else {
1026                                 status.setStatus(TransactionStatus.StatusAborted);
1027                         }
1028                 } else {
1029                         TransactionStatus status =  transaction.getTransactionStatus();
1030                         if (foundAbort) {
1031                                 status.setStatus(TransactionStatus.StatusAborted);
1032                         } else {
1033                                 status.setStatus(TransactionStatus.StatusCommitted);
1034                         }
1035                 }
1036
1037                 return new Pair<Boolean, Boolean>(false, true);
1038         }
1039
1040         public synchronized char[] acceptDataFromLocal(char[] data) {
1041
1042                 // Decode the data
1043                 ByteBuffer bbDecode = ByteBuffer.wrap(data);
1044                 int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong();
1045                 int numberOfParts = bbDecode.getInt();
1046
1047                 // If we did commit a transaction or not
1048                 bool didCommit = false;
1049                 bool couldArbitrate = false;
1050
1051                 if (numberOfParts != 0) {
1052
1053                         // decode the transaction
1054                         Transaction transaction = new Transaction();
1055                         for (int i = 0; i < numberOfParts; i++) {
1056                                 bbDecode.get();
1057                                 TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
1058                                 transaction.addPartDecode(newPart);
1059                         }
1060
1061                         // Arbitrate on transaction and pull relevant return data
1062                         Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1063                         couldArbitrate = localArbitrateReturn.getFirst();
1064                         didCommit = localArbitrateReturn.getSecond();
1065
1066                         updateLiveStateFromLocal();
1067
1068                         // Transaction was sent to the server so keep track of it to prevent double commit
1069                         if (transaction.getSequenceNumber() != -1) {
1070                                 offlineTransactionsCommittedAndAtServer.add(transaction.getId());
1071                         }
1072                 }
1073
1074                 // The data to send back
1075                 int returnDataSize = 0;
1076                 List<Entry> unseenArbitrations = new ArrayList<Entry>();
1077
1078                 // Get the aborts to send back
1079                 List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
1080                 Collections.sort(abortLocalSequenceNumbers);
1081                 for (Long localSequenceNumber : abortLocalSequenceNumbers) {
1082                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1083                                 continue;
1084                         }
1085
1086                         Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1087                         unseenArbitrations.add(abort);
1088                         returnDataSize += abort.getSize();
1089                 }
1090
1091                 // Get the commits to send back
1092                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1093                 if (commitForClientTable != NULL) {
1094                         List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
1095                         Collections.sort(commitLocalSequenceNumbers);
1096
1097                         for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1098                                 Commit commit = commitForClientTable.get(localSequenceNumber);
1099
1100                                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1101                                         continue;
1102                                 }
1103
1104                                 unseenArbitrations.addAll(commit.getParts().values());
1105
1106                                 for (CommitPart commitPart : commit.getParts().values()) {
1107                                         returnDataSize += commitPart.getSize();
1108                                 }
1109                         }
1110                 }
1111
1112                 // Number of arbitration entries to decode
1113                 returnDataSize += 2 * sizeof(int32_t);
1114
1115                 // Boolean of did commit or not
1116                 if (numberOfParts != 0) {
1117                         returnDataSize += sizeof(char);
1118                 }
1119
1120                 // Data to send Back
1121                 char[] returnData = new char[returnDataSize];
1122                 ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1123
1124                 if (numberOfParts != 0) {
1125                         if (didCommit) {
1126                                 bbEncode.put((char)1);
1127                         } else {
1128                                 bbEncode.put((char)0);
1129                         }
1130                         if (couldArbitrate) {
1131                                 bbEncode.put((char)1);
1132                         } else {
1133                                 bbEncode.put((char)0);
1134                         }
1135                 }
1136
1137                 bbEncode.putInt(unseenArbitrations.size());
1138                 for (Entry entry : unseenArbitrations) {
1139                         entry.encode(bbEncode);
1140                 }
1141
1142
1143                 localSequenceNumber++;
1144                 return returnData;
1145         }
1146
1147         private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey)  throws ServerException {
1148
1149                 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1150                 attemptedToSendToServer = true;
1151
1152                 bool inserted = false;
1153                 bool lastTryInserted = false;
1154
1155                 Slot[] array = cloud.putSlot(slot, newSize);
1156                 if (array == NULL) {
1157                         array = new Slot[] {slot};
1158                         rejectedSlotList.clear();
1159                         inserted = true;
1160                 }       else {
1161                         if (array.length == 0) {
1162                                 throw new Error("Server Error: Did not send any slots");
1163                         }
1164
1165                         // if (attemptedToSendToServerTmp) {
1166                         if (hadPartialSendToServer) {
1167
1168                                 bool isInserted = false;
1169                                 for (Slot s : array) {
1170                                         if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1171                                                 isInserted = true;
1172                                                 break;
1173                                         }
1174                                 }
1175
1176                                 for (Slot s : array) {
1177                                         if (isInserted) {
1178                                                 break;
1179                                         }
1180
1181                                         // Process each entry in the slot
1182                                         for (Entry entry : s.getEntries()) {
1183
1184                                                 if (entry.getType() == Entry.TypeLastMessage) {
1185                                                         LastMessage lastMessage = (LastMessage)entry;
1186
1187                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1188                                                                 isInserted = true;
1189                                                                 break;
1190                                                         }
1191                                                 }
1192                                         }
1193                                 }
1194
1195                                 if (!isInserted) {
1196                                         rejectedSlotList.add(slot.getSequenceNumber());
1197                                         lastTryInserted = false;
1198                                 } else {
1199                                         lastTryInserted = true;
1200                                 }
1201                         } else {
1202                                 rejectedSlotList.add(slot.getSequenceNumber());
1203                                 lastTryInserted = false;
1204                         }
1205                 }
1206
1207                 return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
1208         }
1209
1210         /**
1211          * Returns false if a resize was needed
1212          */
1213         private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
1214
1215
1216                 int newSize = 0;
1217                 if (liveSlotCount > bufferResizeThreshold) {
1218                         resize = true; //Resize is forced
1219
1220                 }
1221
1222                 if (resize) {
1223                         newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1224                         TableStatus status = new TableStatus(slot, newSize);
1225                         slot.addEntry(status);
1226                 }
1227
1228                 // Fill with rejected slots first before doing anything else
1229                 doRejectedMessages(slot);
1230
1231                 // Do mandatory rescue of entries
1232                 ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1233
1234                 // Extract working variables
1235                 bool needsResize = mandatoryRescueReturn.getFirst();
1236                 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1237                 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1238
1239                 if (needsResize && !resize) {
1240                         // We need to resize but we are not resizing so return false
1241                         return new ThreeTuple<Boolean, Integer, Boolean>(true, NULL, NULL);
1242                 }
1243
1244                 bool inserted = false;
1245                 if (newKeyEntry != NULL) {
1246                         newKeyEntry.setSlot(slot);
1247                         if (slot.hasSpace(newKeyEntry)) {
1248
1249                                 slot.addEntry(newKeyEntry);
1250                                 inserted = true;
1251                         }
1252                 }
1253
1254                 // Clear the transactions, aborts and commits that were sent previously
1255                 transactionPartsSent.clear();
1256                 pendingSendArbitrationEntriesToDelete.clear();
1257
1258                 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1259                         bool isFull = false;
1260                         round.generateParts();
1261                         List<Entry> parts = round.getParts();
1262
1263                         // Insert pending arbitration data
1264                         for (Entry arbitrationData : parts) {
1265
1266                                 // If it is an abort then we need to set some information
1267                                 if (arbitrationData instanceof Abort) {
1268                                         ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1269                                 }
1270
1271                                 if (!slot.hasSpace(arbitrationData)) {
1272                                         // No space so cant do anything else with these data entries
1273                                         isFull = true;
1274                                         break;
1275                                 }
1276
1277                                 // Add to this current slot and add it to entries to delete
1278                                 slot.addEntry(arbitrationData);
1279                                 pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1280                         }
1281
1282                         if (isFull) {
1283                                 break;
1284                         }
1285                 }
1286
1287                 if (pendingTransactionQueue.size() > 0) {
1288
1289                         Transaction transaction = pendingTransactionQueue.get(0);
1290
1291                         // Set the transaction sequence number if it has yet to be inserted into the block chain
1292                         // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1293                         //      transaction.setSequenceNumber(slot.getSequenceNumber());
1294                         // }
1295
1296                         if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1297                                 transaction.setSequenceNumber(slot.getSequenceNumber());
1298                         }
1299
1300
1301                         while (true) {
1302                                 TransactionPart part = transaction.getNextPartToSend();
1303
1304                                 if (part == NULL) {
1305                                         // Ran out of parts to send for this transaction so move on
1306                                         break;
1307                                 }
1308
1309                                 if (slot.hasSpace(part)) {
1310                                         slot.addEntry(part);
1311                                         List<Integer> partsSent = transactionPartsSent.get(transaction);
1312                                         if (partsSent == NULL) {
1313                                                 partsSent = new ArrayList<Integer>();
1314                                                 transactionPartsSent.put(transaction, partsSent);
1315                                         }
1316                                         partsSent.add(part.getPartNumber());
1317                                         transactionPartsSent.put(transaction, partsSent);
1318                                 } else {
1319                                         break;
1320                                 }
1321                         }
1322                 }
1323
1324                 // Fill the remainder of the slot with rescue data
1325                 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1326
1327                 return new ThreeTuple<Boolean, Integer, Boolean>(false, newSize, inserted);
1328         }
1329
1330         private void doRejectedMessages(Slot s) {
1331                 if (! rejectedSlotList.isEmpty()) {
1332                         /* TODO: We should avoid generating a rejected message entry if
1333                          * there is already a sufficient entry in the queue (e.g.,
1334                          * equalsto value of true and same sequence number).  */
1335
1336                         int64_t old_seqn = rejectedSlotList.firstElement();
1337                         if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
1338                                 int64_t new_seqn = rejectedSlotList.lastElement();
1339                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1340                                 s.addEntry(rm);
1341                         } else {
1342                                 int64_t prev_seqn = -1;
1343                                 int i = 0;
1344                                 /* Go through list of missing messages */
1345                                 for (; i < rejectedSlotList.size(); i++) {
1346                                         int64_t curr_seqn = rejectedSlotList.get(i);
1347                                         Slot s_msg = buffer.getSlot(curr_seqn);
1348                                         if (s_msg != NULL)
1349                                                 break;
1350                                         prev_seqn = curr_seqn;
1351                                 }
1352                                 /* Generate rejected message entry for missing messages */
1353                                 if (prev_seqn != -1) {
1354                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1355                                         s.addEntry(rm);
1356                                 }
1357                                 /* Generate rejected message entries for present messages */
1358                                 for (; i < rejectedSlotList.size(); i++) {
1359                                         int64_t curr_seqn = rejectedSlotList.get(i);
1360                                         Slot s_msg = buffer.getSlot(curr_seqn);
1361                                         int64_t machineid = s_msg.getMachineID();
1362                                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1363                                         s.addEntry(rm);
1364                                 }
1365                         }
1366                 }
1367         }
1368
1369         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, bool resize) {
1370                 int64_t newestSequenceNumber = buffer.getNewestSeqNum();
1371                 int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
1372                 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1373                         oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1374                 }
1375
1376                 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1377                 bool seenLiveSlot = false;
1378                 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1379                 int64_t threshold = firstIfFull + FREE_SLOTS;   // we want the buffer to be clear of live entries up to this point
1380
1381
1382                 // Mandatory Rescue
1383                 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1384                         Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1385                         // Push slot number forward
1386                         if (! seenLiveSlot) {
1387                                 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1388                         }
1389
1390                         if (!previousSlot.isLive()) {
1391                                 continue;
1392                         }
1393
1394                         // We have seen a live slot
1395                         seenLiveSlot = true;
1396
1397                         // Get all the live entries for a slot
1398                         Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1399
1400                         // Iterate over all the live entries and try to rescue them
1401                         for (Entry liveEntry : liveEntries) {
1402                                 if (slot.hasSpace(liveEntry)) {
1403
1404                                         // Enough space to rescue the entry
1405                                         slot.addEntry(liveEntry);
1406                                 } else if (currentSequenceNumber == firstIfFull) {
1407                                         //if there's no space but the entry is about to fall off the queue
1408                                         System.out.println("B"); //?
1409                                         return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
1410
1411                                 }
1412                         }
1413                 }
1414
1415                 // Did not resize
1416                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
1417         }
1418
1419         private void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
1420                 /* now go through live entries from least to greatest sequence number until
1421                  * either all live slots added, or the slot doesn't have enough room
1422                  * for SKIP_THRESHOLD consecutive entries*/
1423                 int skipcount = 0;
1424                 int64_t newestseqnum = buffer.getNewestSeqNum();
1425                 search:
1426                 for (; seqn <= newestseqnum; seqn++) {
1427                         Slot prevslot = buffer.getSlot(seqn);
1428                         //Push slot number forward
1429                         if (!seenliveslot)
1430                                 oldestLiveSlotSequenceNumver = seqn;
1431
1432                         if (!prevslot.isLive())
1433                                 continue;
1434                         seenliveslot = true;
1435                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1436                         for (Entry liveentry : liveentries) {
1437                                 if (s.hasSpace(liveentry))
1438                                         s.addEntry(liveentry);
1439                                 else {
1440                                         skipcount++;
1441                                         if (skipcount > SKIP_THRESHOLD)
1442                                                 break search;
1443                                 }
1444                         }
1445                 }
1446         }
1447
1448         /**
1449          * Checks for malicious activity and updates the local copy of the block chain.
1450          */
1451         private void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
1452
1453                 // The cloud communication layer has checked slot HMACs already before decoding
1454                 if (newSlots.length == 0) {
1455                         return;
1456                 }
1457
1458                 // Make sure all slots are newer than the last largest slot this client has seen
1459                 int64_t firstSeqNum = newSlots[0].getSequenceNumber();
1460                 if (firstSeqNum <= sequenceNumber) {
1461                         throw new Error("Server Error: Sent older slots!");
1462                 }
1463
1464                 // Create an object that can access both new slots and slots in our local chain
1465                 // without committing slots to our local chain
1466                 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1467
1468                 // Check that the HMAC chain is not broken
1469                 checkHMACChain(indexer, newSlots);
1470
1471                 // Set to keep track of messages from clients
1472                 HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1473
1474                 // Process each slots data
1475                 for (Slot slot : newSlots) {
1476                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1477
1478                         updateExpectedSize();
1479                 }
1480
1481                 // If there is a gap, check to see if the server sent us everything.
1482                 if (firstSeqNum != (sequenceNumber + 1)) {
1483
1484                         // Check the size of the slots that were sent down by the server.
1485                         // Can only check the size if there was a gap
1486                         checkNumSlots(newSlots.length);
1487
1488                         // Since there was a gap every machine must have pushed a slot or must have
1489                         // a last message message.  If not then the server is hiding slots
1490                         if (!machineSet.isEmpty()) {
1491                                 throw new Error("Missing record for machines: " + machineSet);
1492                         }
1493                 }
1494
1495                 // Update the size of our local block chain.
1496                 commitNewMaxSize();
1497
1498                 // Commit new to slots to the local block chain.
1499                 for (Slot slot : newSlots) {
1500
1501                         // Insert this slot into our local block chain copy.
1502                         buffer.putSlot(slot);
1503
1504                         // Keep track of how many slots are currently live (have live data in them).
1505                         liveSlotCount++;
1506                 }
1507
1508                 // Get the sequence number of the latest slot in the system
1509                 sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1510
1511                 updateLiveStateFromServer();
1512
1513                 // No Need to remember after we pulled from the server
1514                 offlineTransactionsCommittedAndAtServer.clear();
1515
1516                 // This is invalidated now
1517                 hadPartialSendToServer = false;
1518         }
1519
1520         private void updateLiveStateFromServer() {
1521                 // Process the new transaction parts
1522                 processNewTransactionParts();
1523
1524                 // Do arbitration on new transactions that were received
1525                 arbitrateFromServer();
1526
1527                 // Update all the committed keys
1528                 bool didCommitOrSpeculate = updateCommittedTable();
1529
1530                 // Delete the transactions that are now dead
1531                 updateLiveTransactionsAndStatus();
1532
1533                 // Do speculations
1534                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1535                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1536         }
1537
1538         private void updateLiveStateFromLocal() {
1539                 // Update all the committed keys
1540                 bool didCommitOrSpeculate = updateCommittedTable();
1541
1542                 // Delete the transactions that are now dead
1543                 updateLiveTransactionsAndStatus();
1544
1545                 // Do speculations
1546                 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1547                 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1548         }
1549
1550         private void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1551                 // if (didFindTableStatus) {
1552                 // return;
1553                 // }
1554                 int64_t prevslots = firstSequenceNumber;
1555
1556
1557                 if (didFindTableStatus) {
1558                         // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1559                         // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1560
1561                 } else {
1562                         expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1563                         // System.out.println("Here: " + expectedsize);
1564                 }
1565
1566                 // System.out.println(numberOfSlots);
1567
1568                 didFindTableStatus = true;
1569                 currMaxSize = numberOfSlots;
1570         }
1571
1572         private void updateExpectedSize() {
1573                 expectedsize++;
1574
1575                 if (expectedsize > currMaxSize) {
1576                         expectedsize = currMaxSize;
1577                 }
1578         }
1579
1580
1581         /**
1582          * Check the size of the block chain to make sure there are enough slots sent back by the server.
1583          * This is only called when we have a gap between the slots that we have locally and the slots
1584          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1585          * status message
1586          */
1587         private void checkNumSlots(int numberOfSlots) {
1588                 if (numberOfSlots != expectedsize) {
1589                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1590                 }
1591         }
1592
1593         private void updateCurrMaxSize(int newmaxsize) {
1594                 currMaxSize = newmaxsize;
1595         }
1596
1597
1598         /**
1599          * Update the size of of the local buffer if it is needed.
1600          */
1601         private void commitNewMaxSize() {
1602                 didFindTableStatus = false;
1603
1604                 // Resize the local slot buffer
1605                 if (numberOfSlots != currMaxSize) {
1606                         buffer.resize((int)currMaxSize);
1607                 }
1608
1609                 // Change the number of local slots to the new size
1610                 numberOfSlots = (int)currMaxSize;
1611
1612
1613                 // Recalculate the resize threshold since the size of the local buffer has changed
1614                 setResizeThreshold();
1615         }
1616
1617         /**
1618          * Process the new transaction parts from this latest round of slots received from the server
1619          */
1620         private void processNewTransactionParts() {
1621
1622                 if (newTransactionParts.size() == 0) {
1623                         // Nothing new to process
1624                         return;
1625                 }
1626
1627                 // Iterate through all the machine Ids that we received new parts for
1628                 for (Long machineId : newTransactionParts.keySet()) {
1629                         Map<Pair<Long, Integer>, TransactionPart> parts = newTransactionParts.get(machineId);
1630
1631                         // Iterate through all the parts for that machine Id
1632                         for (Pair<Long, Integer> partId : parts.keySet()) {
1633                                 TransactionPart part = parts.get(partId);
1634
1635                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1636                                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) {
1637                                         // Set dead the transaction part
1638                                         part.setDead();
1639                                         continue;
1640                                 }
1641
1642                                 // Get the transaction object for that sequence number
1643                                 Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1644
1645                                 if (transaction == NULL) {
1646                                         // This is a new transaction that we dont have so make a new one
1647                                         transaction = new Transaction();
1648
1649                                         // Insert this new transaction into the live tables
1650                                         liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1651                                         liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1652                                 }
1653
1654                                 // Add that part to the transaction
1655                                 transaction.addPartDecode(part);
1656                         }
1657                 }
1658
1659                 // Clear all the new transaction parts in preparation for the next time the server sends slots
1660                 newTransactionParts.clear();
1661         }
1662
1663
1664         private int64_t lastSeqNumArbOn = 0;
1665
1666         private void arbitrateFromServer() {
1667
1668                 if (liveTransactionBySequenceNumberTable.size() == 0) {
1669                         // Nothing to arbitrate on so move on
1670                         return;
1671                 }
1672
1673                 // Get the transaction sequence numbers and sort from oldest to newest
1674                 List<Long> transactionSequenceNumbers = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
1675                 Collections.sort(transactionSequenceNumbers);
1676
1677                 // Collection of key value pairs that are
1678                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1679
1680                 // The last transaction arbitrated on
1681                 int64_t lastTransactionCommitted = -1;
1682                 Set<Abort> generatedAborts = new HashSet<Abort>();
1683
1684                 for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1685                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1686
1687
1688
1689                         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1690                         if (transaction.getArbitrator() != localMachineId) {
1691                                 continue;
1692                         }
1693
1694                         if (transactionSequenceNumber < lastSeqNumArbOn) {
1695                                 continue;
1696                         }
1697
1698                         if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1699                                 // We have seen this already locally so dont commit again
1700                                 continue;
1701                         }
1702
1703
1704                         if (!transaction.isComplete()) {
1705                                 // Will arbitrate in incorrect order if we continue so just break
1706                                 // Most likely this
1707                                 break;
1708                         }
1709
1710
1711                         // update the largest transaction seen by arbitrator from server
1712                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) {
1713                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1714                         } else {
1715                                 Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1716                                 if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1717                                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1718                                 }
1719                         }
1720
1721                         if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1722                                 // Guard evaluated as true
1723
1724                                 // Update the local changes so we can make the commit
1725                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1726                                         speculativeTableTmp.put(kv.getKey(), kv);
1727                                 }
1728
1729                                 // Update what the last transaction committed was for use in batch commit
1730                                 lastTransactionCommitted = transactionSequenceNumber;
1731                         } else {
1732                                 // Guard evaluated was false so create abort
1733
1734                                 // Create the abort
1735                                 Abort newAbort = new Abort(NULL,
1736                                                            transaction.getClientLocalSequenceNumber(),
1737                                                            transaction.getSequenceNumber(),
1738                                                            transaction.getMachineId(),
1739                                                            transaction.getArbitrator(),
1740                                                            localArbitrationSequenceNumber);
1741                                 localArbitrationSequenceNumber++;
1742
1743                                 generatedAborts.add(newAbort);
1744
1745                                 // Insert the abort so we can process
1746                                 processEntry(newAbort);
1747                         }
1748
1749                         lastSeqNumArbOn = transactionSequenceNumber;
1750
1751                         // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1752                 }
1753
1754                 Commit newCommit = NULL;
1755
1756                 // If there is something to commit
1757                 if (speculativeTableTmp.size() != 0) {
1758
1759                         // Create the commit and increment the commit sequence number
1760                         newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1761                         localArbitrationSequenceNumber++;
1762
1763                         // Add all the new keys to the commit
1764                         for (KeyValue kv : speculativeTableTmp.values()) {
1765                                 newCommit.addKV(kv);
1766                         }
1767
1768                         // create the commit parts
1769                         newCommit.createCommitParts();
1770
1771                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1772
1773                         // Insert the commit so we can process it
1774                         for (CommitPart commitPart : newCommit.getParts().values()) {
1775                                 processEntry(commitPart);
1776                         }
1777                 }
1778
1779                 if ((newCommit != NULL) || (generatedAborts.size() > 0)) {
1780                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1781                         pendingSendArbitrationRounds.add(arbitrationRound);
1782
1783                         if (compactArbitrationData()) {
1784                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1785                                 if (newArbitrationRound.getCommit() != NULL) {
1786                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1787                                                 processEntry(commitPart);
1788                                         }
1789                                 }
1790                         }
1791                 }
1792         }
1793
1794         private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
1795
1796                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1797                 if (transaction.getArbitrator() != localMachineId) {
1798                         return new Pair<Boolean, Boolean>(false, false);
1799                 }
1800
1801                 if (!transaction.isComplete()) {
1802                         // Will arbitrate in incorrect order if we continue so just break
1803                         // Most likely this
1804                         return new Pair<Boolean, Boolean>(false, false);
1805                 }
1806
1807                 if (transaction.getMachineId() != localMachineId) {
1808                         // dont do this check for local transactions
1809                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
1810                                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1811                                         // We've have already seen this from the server
1812                                         return new Pair<Boolean, Boolean>(false, false);
1813                                 }
1814                         }
1815                 }
1816
1817                 if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1818                         // Guard evaluated as true
1819
1820                         // Create the commit and increment the commit sequence number
1821                         Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1822                         localArbitrationSequenceNumber++;
1823
1824                         // Update the local changes so we can make the commit
1825                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1826                                 newCommit.addKV(kv);
1827                         }
1828
1829                         // create the commit parts
1830                         newCommit.createCommitParts();
1831
1832                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1833                         ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1834                         pendingSendArbitrationRounds.add(arbitrationRound);
1835
1836                         if (compactArbitrationData()) {
1837                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1838                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1839                                         processEntry(commitPart);
1840                                 }
1841                         } else {
1842                                 // Insert the commit so we can process it
1843                                 for (CommitPart commitPart : newCommit.getParts().values()) {
1844                                         processEntry(commitPart);
1845                                 }
1846                         }
1847
1848                         if (transaction.getMachineId() == localMachineId) {
1849                                 TransactionStatus status = transaction.getTransactionStatus();
1850                                 if (status != NULL) {
1851                                         status.setStatus(TransactionStatus.StatusCommitted);
1852                                 }
1853                         }
1854
1855                         updateLiveStateFromLocal();
1856                         return new Pair<Boolean, Boolean>(true, true);
1857                 } else {
1858
1859                         if (transaction.getMachineId() == localMachineId) {
1860                                 // For locally created messages update the status
1861
1862                                 // Guard evaluated was false so create abort
1863                                 TransactionStatus status = transaction.getTransactionStatus();
1864                                 if (status != NULL) {
1865                                         status.setStatus(TransactionStatus.StatusAborted);
1866                                 }
1867                         } else {
1868                                 Set addAbortSet = new HashSet<Abort>();
1869
1870
1871                                 // Create the abort
1872                                 Abort newAbort = new Abort(NULL,
1873                                                            transaction.getClientLocalSequenceNumber(),
1874                                                            -1,
1875                                                            transaction.getMachineId(),
1876                                                            transaction.getArbitrator(),
1877                                                            localArbitrationSequenceNumber);
1878                                 localArbitrationSequenceNumber++;
1879
1880                                 addAbortSet.add(newAbort);
1881
1882
1883                                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1884                                 ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1885                                 pendingSendArbitrationRounds.add(arbitrationRound);
1886
1887                                 if (compactArbitrationData()) {
1888                                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1889                                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1890                                                 processEntry(commitPart);
1891                                         }
1892                                 }
1893                         }
1894
1895                         updateLiveStateFromLocal();
1896                         return new Pair<Boolean, Boolean>(true, false);
1897                 }
1898         }
1899
1900         /**
1901          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1902          */
1903         private bool compactArbitrationData() {
1904
1905                 if (pendingSendArbitrationRounds.size() < 2) {
1906                         // Nothing to compact so do nothing
1907                         return false;
1908                 }
1909
1910                 ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1911                 if (lastRound.didSendPart()) {
1912                         return false;
1913                 }
1914
1915                 bool hadCommit = (lastRound.getCommit() == NULL);
1916                 bool gotNewCommit = false;
1917
1918                 int numberToDelete = 1;
1919                 while (numberToDelete < pendingSendArbitrationRounds.size()) {
1920                         ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1921
1922                         if (round.isFull() || round.didSendPart()) {
1923                                 // Stop since there is a part that cannot be compacted and we need to compact in order
1924                                 break;
1925                         }
1926
1927                         if (round.getCommit() == NULL) {
1928
1929                                 // Try compacting aborts only
1930                                 int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1931                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1932                                         // Cant compact since it would be too large
1933                                         break;
1934                                 }
1935                                 lastRound.addAborts(round.getAborts());
1936                         } else {
1937
1938                                 // Create a new larger commit
1939                                 Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1940                                 localArbitrationSequenceNumber++;
1941
1942                                 // Create the commit parts so that we can count them
1943                                 newCommit.createCommitParts();
1944
1945                                 // Calculate the new size of the parts
1946                                 int newSize = newCommit.getNumberOfParts();
1947                                 newSize += lastRound.getAbortsCount();
1948                                 newSize += round.getAbortsCount();
1949
1950                                 if (newSize > ArbitrationRound.MAX_PARTS) {
1951                                         // Cant compact since it would be too large
1952                                         break;
1953                                 }
1954
1955                                 // Set the new compacted part
1956                                 lastRound.setCommit(newCommit);
1957                                 lastRound.addAborts(round.getAborts());
1958                                 gotNewCommit = true;
1959                         }
1960
1961                         numberToDelete++;
1962                 }
1963
1964                 if (numberToDelete != 1) {
1965                         // If there is a compaction
1966
1967                         // Delete the previous pieces that are now in the new compacted piece
1968                         if (numberToDelete == pendingSendArbitrationRounds.size()) {
1969                                 pendingSendArbitrationRounds.clear();
1970                         } else {
1971                                 for (int i = 0; i < numberToDelete; i++) {
1972                                         pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1973                                 }
1974                         }
1975
1976                         // Add the new compacted into the pending to send list
1977                         pendingSendArbitrationRounds.add(lastRound);
1978
1979                         // Should reinsert into the commit processor
1980                         if (hadCommit && gotNewCommit) {
1981                                 return true;
1982                         }
1983                 }
1984
1985                 return false;
1986         }
1987         // private bool compactArbitrationData() {
1988         //      return false;
1989         // }
1990
1991         /**
1992          * Update all the commits and the committed tables, sets dead the dead transactions
1993          */
1994         private bool updateCommittedTable() {
1995
1996                 if (newCommitParts.size() == 0) {
1997                         // Nothing new to process
1998                         return false;
1999                 }
2000
2001                 // Iterate through all the machine Ids that we received new parts for
2002                 for (Long machineId : newCommitParts.keySet()) {
2003                         Map<Pair<Long, Integer>, CommitPart> parts = newCommitParts.get(machineId);
2004
2005                         // Iterate through all the parts for that machine Id
2006                         for (Pair<Long, Integer> partId : parts.keySet()) {
2007                                 CommitPart part = parts.get(partId);
2008
2009                                 // Get the transaction object for that sequence number
2010                                 Map<Long, Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
2011
2012                                 if (commitForClientTable == NULL) {
2013                                         // This is the first commit from this device
2014                                         commitForClientTable = new HashMap<Long, Commit>();
2015                                         liveCommitsTable.put(part.getMachineId(), commitForClientTable);
2016                                 }
2017
2018                                 Commit commit = commitForClientTable.get(part.getSequenceNumber());
2019
2020                                 if (commit == NULL) {
2021                                         // This is a new commit that we dont have so make a new one
2022                                         commit = new Commit();
2023
2024                                         // Insert this new commit into the live tables
2025                                         commitForClientTable.put(part.getSequenceNumber(), commit);
2026                                 }
2027
2028                                 // Add that part to the commit
2029                                 commit.addPartDecode(part);
2030                         }
2031                 }
2032
2033                 // Clear all the new commits parts in preparation for the next time the server sends slots
2034                 newCommitParts.clear();
2035
2036                 // If we process a new commit keep track of it for future use
2037                 bool didProcessANewCommit = false;
2038
2039                 // Process the commits one by one
2040                 for (Long arbitratorId : liveCommitsTable.keySet()) {
2041
2042                         // Get all the commits for a specific arbitrator
2043                         Map<Long, Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
2044
2045                         // Sort the commits in order
2046                         List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
2047                         Collections.sort(commitSequenceNumbers);
2048
2049                         // Get the last commit seen from this arbitrator
2050                         int64_t lastCommitSeenSequenceNumber = -1;
2051                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) {
2052                                 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
2053                         }
2054
2055                         // Go through each new commit one by one
2056                         for (int i = 0; i < commitSequenceNumbers.size(); i++) {
2057                                 Long commitSequenceNumber = commitSequenceNumbers.get(i);
2058                                 Commit commit = commitForClientTable.get(commitSequenceNumber);
2059
2060                                 // Special processing if a commit is not complete
2061                                 if (!commit.isComplete()) {
2062                                         if (i == (commitSequenceNumbers.size() - 1)) {
2063                                                 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
2064                                                 break;
2065                                         } else {
2066                                                 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
2067                                                 // Delete it and move on
2068                                                 commit.setDead();
2069                                                 commitForClientTable.remove(commit.getSequenceNumber());
2070                                                 continue;
2071                                         }
2072                                 }
2073
2074                                 // Update the last transaction that was updated if we can
2075                                 if (commit.getTransactionSequenceNumber() != -1) {
2076                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2077
2078                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2079                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2080                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2081                                         }
2082                                 }
2083
2084                                 // Update the last arbitration data that we have seen so far
2085                                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) {
2086
2087                                         int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2088                                         if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2089                                                 // Is larger
2090                                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2091                                         }
2092                                 } else {
2093                                         // Never seen any data from this arbitrator so record the first one
2094                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2095                                 }
2096
2097                                 // We have already seen this commit before so need to do the full processing on this commit
2098                                 if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2099
2100                                         // Update the last transaction that was updated if we can
2101                                         if (commit.getTransactionSequenceNumber() != -1) {
2102                                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2103
2104                                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2105                                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2106                                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2107                                                 }
2108                                         }
2109
2110                                         continue;
2111                                 }
2112
2113                                 // If we got here then this is a brand new commit and needs full processing
2114
2115                                 // Get what commits should be edited, these are the commits that have live values for their keys
2116                                 Set<Commit> commitsToEdit = new HashSet<Commit>();
2117                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2118                                         commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2119                                 }
2120                                 commitsToEdit.remove(NULL); // remove NULL since it could be in this set
2121
2122                                 // Update each previous commit that needs to be updated
2123                                 for (Commit previousCommit : commitsToEdit) {
2124
2125                                         // Only bother with live commits (TODO: Maybe remove this check)
2126                                         if (previousCommit.isLive()) {
2127
2128                                                 // Update which keys in the old commits are still live
2129                                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2130                                                         previousCommit.invalidateKey(kv.getKey());
2131                                                 }
2132
2133                                                 // if the commit is now dead then remove it
2134                                                 if (!previousCommit.isLive()) {
2135                                                         commitForClientTable.remove(previousCommit);
2136                                                 }
2137                                         }
2138                                 }
2139
2140                                 // Update the last seen sequence number from this arbitrator
2141                                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) {
2142                                         if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2143                                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2144                                         }
2145                                 } else {
2146                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2147                                 }
2148
2149                                 // We processed a new commit that we havent seen before
2150                                 didProcessANewCommit = true;
2151
2152                                 // Update the committed table of keys and which commit is using which key
2153                                 for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2154                                         committedKeyValueTable.put(kv.getKey(), kv);
2155                                         liveCommitsByKeyTable.put(kv.getKey(), commit);
2156                                 }
2157                         }
2158                 }
2159
2160                 return didProcessANewCommit;
2161         }
2162
2163         /**
2164          * Create the speculative table from transactions that are still live and have come from the cloud
2165          */
2166         private bool updateSpeculativeTable(bool didProcessNewCommits) {
2167                 if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2168                         // There is nothing to speculate on
2169                         return false;
2170                 }
2171
2172                 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2173                 List<Long> transactionSequenceNumbersSorted = new ArrayList<Long>(liveTransactionBySequenceNumberTable.keySet());
2174                 Collections.sort(transactionSequenceNumbersSorted);
2175
2176                 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2177
2178
2179                 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2180                         // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2181                         // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2182
2183                         // Start from scratch
2184                         speculatedKeyValueTable.clear();
2185                         lastTransactionSequenceNumberSpeculatedOn = -1;
2186                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2187
2188                 }
2189
2190                 // Remember the front of the transaction list
2191                 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2192
2193                 // Find where to start arbitration from
2194                 int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2195
2196                 if (startIndex >= transactionSequenceNumbersSorted.size()) {
2197                         // Make sure we are not out of bounds
2198                         return false; // did not speculate
2199                 }
2200
2201                 Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2202                 bool didSkip = true;
2203
2204                 for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2205                         int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2206                         Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2207
2208                         if (!transaction.isComplete()) {
2209                                 // If there is an incomplete transaction then there is nothing we can do
2210                                 // add this transactions arbitrator to the list of arbitrators we should ignore
2211                                 incompleteTransactionArbitrator.add(transaction.getArbitrator());
2212                                 didSkip = true;
2213                                 continue;
2214                         }
2215
2216                         if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2217                                 continue;
2218                         }
2219
2220                         lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2221
2222                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2223                                 // Guard evaluated to true so update the speculative table
2224                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2225                                         speculatedKeyValueTable.put(kv.getKey(), kv);
2226                                 }
2227                         }
2228                 }
2229
2230                 if (didSkip) {
2231                         // Since there was a skip we need to redo the speculation next time around
2232                         lastTransactionSequenceNumberSpeculatedOn = -1;
2233                         oldestTransactionSequenceNumberSpeculatedOn = -1;
2234                 }
2235
2236                 // We did some speculation
2237                 return true;
2238         }
2239
2240         /**
2241          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2242          */
2243         private void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2244                 if (pendingTransactionQueue.size() == 0) {
2245                         // There is nothing to speculate on
2246                         return;
2247                 }
2248
2249
2250                 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2251                         // need to reset on the pending speculation
2252                         lastPendingTransactionSpeculatedOn = NULL;
2253                         firstPendingTransaction = pendingTransactionQueue.get(0);
2254                         pendingTransactionSpeculatedKeyValueTable.clear();
2255                 }
2256
2257                 // Find where to start arbitration from
2258                 int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2259
2260                 if (startIndex >= pendingTransactionQueue.size()) {
2261                         // Make sure we are not out of bounds
2262                         return;
2263                 }
2264
2265                 for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2266                         Transaction transaction = pendingTransactionQueue.get(i);
2267
2268                         lastPendingTransactionSpeculatedOn = transaction;
2269
2270                         if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2271                                 // Guard evaluated to true so update the speculative table
2272                                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2273                                         pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2274                                 }
2275                         }
2276                 }
2277         }
2278
2279         /**
2280          * Set dead and remove from the live transaction tables the transactions that are dead
2281          */
2282         private void updateLiveTransactionsAndStatus() {
2283
2284                 // Go through each of the transactions
2285                 for (Iterator<Map.Entry<Long, Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2286                         Transaction transaction = iter.next().getValue();
2287
2288                         // Check if the transaction is dead
2289                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2290                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2291
2292                                 // Set dead the transaction
2293                                 transaction.setDead();
2294
2295                                 // Remove the transaction from the live table
2296                                 iter.remove();
2297                                 liveTransactionByTransactionIdTable.remove(transaction.getId());
2298                         }
2299                 }
2300
2301                 // Go through each of the transactions
2302                 for (Iterator<Map.Entry<Long, TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2303                         TransactionStatus status = iter.next().getValue();
2304
2305                         // Check if the transaction is dead
2306                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2307                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2308
2309                                 // Set committed
2310                                 status.setStatus(TransactionStatus.StatusCommitted);
2311
2312                                 // Remove
2313                                 iter.remove();
2314                         }
2315                 }
2316         }
2317
2318         /**
2319          * Process this slot, entry by entry.  Also update the latest message sent by slot
2320          */
2321         private void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2322
2323                 // Update the last message seen
2324                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2325
2326                 // Process each entry in the slot
2327                 for (Entry entry : slot.getEntries()) {
2328                         switch (entry.getType()) {
2329
2330                         case Entry.TypeCommitPart:
2331                                 processEntry((CommitPart)entry);
2332                                 break;
2333
2334                         case Entry.TypeAbort:
2335                                 processEntry((Abort)entry);
2336                                 break;
2337
2338                         case Entry.TypeTransactionPart:
2339                                 processEntry((TransactionPart)entry);
2340                                 break;
2341
2342                         case Entry.TypeNewKey:
2343                                 processEntry((NewKey)entry);
2344                                 break;
2345
2346                         case Entry.TypeLastMessage:
2347                                 processEntry((LastMessage)entry, machineSet);
2348                                 break;
2349
2350                         case Entry.TypeRejectedMessage:
2351                                 processEntry((RejectedMessage)entry, indexer);
2352                                 break;
2353
2354                         case Entry.TypeTableStatus:
2355                                 processEntry((TableStatus)entry, slot.getSequenceNumber());
2356                                 break;
2357
2358                         default:
2359                                 throw new Error("Unrecognized type: " + entry.getType());
2360                         }
2361                 }
2362         }
2363
2364         /**
2365          * Update the last message that was sent for a machine Id
2366          */
2367         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
2368                 // Update what the last message received by a machine was
2369                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2370         }
2371
2372         /**
2373          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2374          */
2375         private void processEntry(NewKey entry) {
2376
2377                 // Update the arbitrator table with the new key information
2378                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
2379
2380                 // Update what the latest live new key is
2381                 NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2382                 if (oldNewKey != NULL) {
2383                         // Delete the old new key messages
2384                         oldNewKey.setDead();
2385                 }
2386         }
2387
2388         /**
2389          * Process new table status entries and set dead the old ones as new ones come in.
2390          * keeps track of the largest and smallest table status seen in this current round
2391          * of updating the local copy of the block chain
2392          */
2393         private void processEntry(TableStatus entry, int64_t seq) {
2394                 int newNumSlots = entry.getMaxSlots();
2395                 updateCurrMaxSize(newNumSlots);
2396
2397                 initExpectedSize(seq, newNumSlots);
2398
2399                 if (liveTableStatus != NULL) {
2400                         // We have a larger table status so the old table status is no int64_ter alive
2401                         liveTableStatus.setDead();
2402                 }
2403
2404                 // Make this new table status the latest alive table status
2405                 liveTableStatus = entry;
2406         }
2407
2408         /**
2409          * Check old messages to see if there is a block chain violation. Also
2410          */
2411         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
2412                 int64_t oldSeqNum = entry.getOldSeqNum();
2413                 int64_t newSeqNum = entry.getNewSeqNum();
2414                 bool isequal = entry.getEqual();
2415                 int64_t machineId = entry.getMachineID();
2416                 int64_t seq = entry.getSequenceNumber();
2417
2418
2419                 // Check if we have messages that were supposed to be rejected in our local block chain
2420                 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2421
2422                         // Get the slot
2423                         Slot slot = indexer.getSlot(seqNum);
2424
2425                         if (slot != NULL) {
2426                                 // If we have this slot make sure that it was not supposed to be a rejected slot
2427
2428                                 int64_t slotMachineId = slot.getMachineID();
2429                                 if (isequal != (slotMachineId == machineId)) {
2430                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2431                                 }
2432                         }
2433                 }
2434
2435
2436                 // Create a list of clients to watch until they see this rejected message entry.
2437                 HashSet<Long> deviceWatchSet = new HashSet<Long>();
2438                 for (Map.Entry<Long, Pair<Long, Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
2439
2440                         // Machine ID for the last message entry
2441                         int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
2442
2443                         // We've seen it, don't need to continue to watch.  Our next
2444                         // message will implicitly acknowledge it.
2445                         if (lastMessageEntryMachineId == localMachineId) {
2446                                 continue;
2447                         }
2448
2449                         Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
2450                         int64_t entrySequenceNumber = lastMessageValue.getFirst();
2451
2452                         if (entrySequenceNumber < seq) {
2453
2454                                 // Add this rejected message to the set of messages that this machine ID did not see yet
2455                                 addWatchList(lastMessageEntryMachineId, entry);
2456
2457                                 // This client did not see this rejected message yet so add it to the watch set to monitor
2458                                 deviceWatchSet.add(lastMessageEntryMachineId);
2459                         }
2460                 }
2461
2462                 if (deviceWatchSet.isEmpty()) {
2463                         // This rejected message has been seen by all the clients so
2464                         entry.setDead();
2465                 } else {
2466                         // We need to watch this rejected message
2467                         entry.setWatchSet(deviceWatchSet);
2468                 }
2469         }
2470
2471         /**
2472          * Check if this abort is live, if not then save it so we can kill it later.
2473          * update the last transaction number that was arbitrated on.
2474          */
2475         private void processEntry(Abort entry) {
2476
2477
2478                 if (entry.getTransactionSequenceNumber() != -1) {
2479                         // update the transaction status if it was sent to the server
2480                         TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2481                         if (status != NULL) {
2482                                 status.setStatus(TransactionStatus.StatusAborted);
2483                         }
2484                 }
2485
2486                 // Abort has not been seen by the client it is for yet so we need to keep track of it
2487                 Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2488                 if (previouslySeenAbort != NULL) {
2489                         previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
2490                 }
2491
2492                 if (entry.getTransactionArbitrator() == localMachineId) {
2493                         liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2494                 }
2495
2496                 if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2497
2498                         // The machine already saw this so it is dead
2499                         entry.setDead();
2500                         liveAbortTable.remove(entry.getAbortId());
2501
2502                         if (entry.getTransactionArbitrator() == localMachineId) {
2503                                 liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2504                         }
2505
2506                         return;
2507                 }
2508
2509
2510
2511
2512                 // Update the last arbitration data that we have seen so far
2513                 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) {
2514
2515                         int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2516                         if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2517                                 // Is larger
2518                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2519                         }
2520                 } else {
2521                         // Never seen any data from this arbitrator so record the first one
2522                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2523                 }
2524
2525
2526                 // Set dead a transaction if we can
2527                 Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<Long, Long>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2528                 if (transactionToSetDead != NULL) {
2529                         liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2530                 }
2531
2532                 // Update the last transaction sequence number that the arbitrator arbitrated on
2533                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2534                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2535
2536                         // Is a valid one
2537                         if (entry.getTransactionSequenceNumber() != -1) {
2538                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2539                         }
2540                 }
2541         }
2542
2543         /**
2544          * Set dead the transaction part if that transaction is dead and keep track of all new parts
2545          */
2546         private void processEntry(TransactionPart entry) {
2547                 // Check if we have already seen this transaction and set it dead OR if it is not alive
2548                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2549                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2550                         // This transaction is dead, it was already committed or aborted
2551                         entry.setDead();
2552                         return;
2553                 }
2554
2555                 // This part is still alive
2556                 Map<Pair<Long, Integer>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2557
2558                 if (transactionPart == NULL) {
2559                         // Dont have a table for this machine Id yet so make one
2560                         transactionPart = new HashMap<Pair<Long, Integer>, TransactionPart>();
2561                         newTransactionParts.put(entry.getMachineId(), transactionPart);
2562                 }
2563
2564                 // Update the part and set dead ones we have already seen (got a rescued version)
2565                 TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2566                 if (previouslySeenPart != NULL) {
2567                         previouslySeenPart.setDead();
2568                 }
2569         }
2570
2571         /**
2572          * Process new commit entries and save them for future use.  Delete duplicates
2573          */
2574         private void processEntry(CommitPart entry) {
2575
2576
2577                 // Update the last transaction that was updated if we can
2578                 if (entry.getTransactionSequenceNumber() != -1) {
2579                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2580
2581                         // Update the last transaction sequence number that the arbitrator arbitrated on
2582                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2583                                 lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2584                         }
2585                 }
2586
2587
2588
2589
2590                 Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2591
2592                 if (commitPart == NULL) {
2593                         // Don't have a table for this machine Id yet so make one
2594                         commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
2595                         newCommitParts.put(entry.getMachineId(), commitPart);
2596                 }
2597
2598                 // Update the part and set dead ones we have already seen (got a rescued version)
2599                 CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2600                 if (previouslySeenPart != NULL) {
2601                         previouslySeenPart.setDead();
2602                 }
2603         }
2604
2605         /**
2606          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2607          * Updates the live aborts, removes those that are dead and sets them dead.
2608          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2609          * other clients have not had a rollback on the last message.
2610          */
2611         private void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2612
2613                 // We have seen this machine ID
2614                 machineSet.remove(machineId);
2615
2616                 // Get the set of rejected messages that this machine Id is has not seen yet
2617                 HashSet<RejectedMessage> watchset = rejectedMessageWatchListTable.get(machineId);
2618
2619                 // If there is a rejected message that this machine Id has not seen yet
2620                 if (watchset != NULL) {
2621
2622                         // Go through each rejected message that this machine Id has not seen yet
2623                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2624
2625                                 RejectedMessage rm = rmit.next();
2626
2627                                 // If this machine Id has seen this rejected message...
2628                                 if (rm.getSequenceNumber() <= seqNum) {
2629
2630                                         // Remove it from our watchlist
2631                                         rmit.remove();
2632
2633                                         // Decrement machines that need to see this notification
2634                                         rm.removeWatcher(machineId);
2635                                 }
2636                         }
2637                 }
2638
2639                 // Set dead the abort
2640                 for (Iterator<Map.Entry<Pair<Long, Long>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2641                         Abort abort = i.next().getValue();
2642
2643                         if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2644                                 abort.setDead();
2645                                 i.remove();
2646
2647                                 if (abort.getTransactionArbitrator() == localMachineId) {
2648                                         liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2649                                 }
2650                         }
2651                 }
2652
2653
2654
2655                 if (machineId == localMachineId) {
2656                         // Our own messages are immediately dead.
2657                         if (liveness instanceof LastMessage) {
2658                                 ((LastMessage)liveness).setDead();
2659                         } else if (liveness instanceof Slot) {
2660                                 ((Slot)liveness).setDead();
2661                         } else {
2662                                 throw new Error("Unrecognized type");
2663                         }
2664                 }
2665
2666                 // Get the old last message for this device
2667                 Pair<Long, Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<Long, Liveness>(seqNum, liveness));
2668                 if (lastMessageEntry == NULL) {
2669                         // If no last message then there is nothing else to process
2670                         return;
2671                 }
2672
2673                 int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2674                 Liveness lastEntry = lastMessageEntry.getSecond();
2675
2676                 // If it is not our machine Id since we already set ours to dead
2677                 if (machineId != localMachineId) {
2678                         if (lastEntry instanceof LastMessage) {
2679                                 ((LastMessage)lastEntry).setDead();
2680                         } else if (lastEntry instanceof Slot) {
2681                                 ((Slot)lastEntry).setDead();
2682                         } else {
2683                                 throw new Error("Unrecognized type");
2684                         }
2685                 }
2686
2687                 // Make sure the server is not playing any games
2688                 if (machineId == localMachineId) {
2689
2690                         if (hadPartialSendToServer) {
2691                                 // We were not making any updates and we had a machine mismatch
2692                                 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2693                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2694                                 }
2695
2696                         } else {
2697                                 // We were not making any updates and we had a machine mismatch
2698                                 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2699                                         throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2700                                 }
2701                         }
2702                 } else {
2703                         if (lastMessageSeqNum > seqNum) {
2704                                 throw new Error("Server Error: Rollback on remote machine sequence number");
2705                         }
2706                 }
2707         }
2708
2709         /**
2710          * Add a rejected message entry to the watch set to keep track of which clients have seen that
2711          * rejected message entry and which have not.
2712          */
2713         private void addWatchList(int64_t machineId, RejectedMessage entry) {
2714                 HashSet<RejectedMessage> entries = rejectedMessageWatchListTable.get(machineId);
2715                 if (entries == NULL) {
2716                         // There is no set for this machine ID yet so create one
2717                         entries = new HashSet<RejectedMessage>();
2718                         rejectedMessageWatchListTable.put(machineId, entries);
2719                 }
2720                 entries.add(entry);
2721         }
2722
2723         /**
2724          * Check if the HMAC chain is not violated
2725          */
2726         private void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2727                 for (int i = 0; i < newSlots.length; i++) {
2728                         Slot currSlot = newSlots[i];
2729                         Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2730                         if (prevSlot != NULL &&
2731                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2732                                 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2733                 }
2734         }
2735 }