edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2
3 Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
4         localMachineId = _localMachineId;
5         cloud = new CloudComm(this, baseurl, password, listeningPort);
6
7         init();
8 }
9
10 Table::Table(CloudComm _cloud, int64_t _localMachineId) {
11         localMachineId = _localMachineId;
12         cloud = _cloud;
13
14         init();
15 }
16
17 /**
18  * Init all the stuff needed for for table usage
19  */
20 void Table::init() {
21
22         // Init helper objects
23         random = new Random();
24         buffer = new SlotBuffer();
25
26         // Set Variables
27         oldestLiveSlotSequenceNumver = 1;
28
29         // init data structs
30         committedKeyValueTable = new Hashtable<IoTString, KeyValue>();
31         speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
32         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
33         liveNewKeyTable = new Hashtable<IoTString, NewKey>();
34         lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
35         rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
36         arbitratorTable = new Hashtable<IoTString, Long>();
37         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
38         newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
39         newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
40         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
41         liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
42         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
43         liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
44         liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
45         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
46         rejectedSlotVector = new Vector<Long>();
47         pendingTransactionQueue = new Vector<Transaction>();
48         pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
49         transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
50         outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
51         liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
52         offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
53         localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
54         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
55         pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
56         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
57
58
59         // Other init stuff
60         numberOfSlots = buffer.capacity();
61         setResizeThreshold();
62 }
63
64 // TODO: delete method
65 synchronized void Table::printSlots() {
66         int64_t o = buffer.getOldestSeqNum();
67         int64_t n = buffer.getNewestSeqNum();
68
69         int[] types = new int[10];
70
71         int num = 0;
72
73         int livec = 0;
74         int deadc = 0;
75
76         int casdasd = 0;
77
78         int liveslo = 0;
79
80         for (int64_t i = o; i < (n + 1); i++) {
81                 Slot s = buffer.getSlot(i);
82
83
84                 if (s.isLive()) {
85                         liveslo++;
86                 }
87
88                 Vector<Entry> entries = s.getEntries();
89
90                 for (Entry e : entries) {
91                         if (e.isLive()) {
92                                 int type = e.getType();
93
94
95                                 if (type == 6) {
96                                         RejectedMessage rej = (RejectedMessage)e;
97                                         casdasd++;
98
99                                         System.out.println(rej.getMachineID());
100                                 }
101
102
103                                 types[type] = types[type] + 1;
104                                 num++;
105                                 livec++;
106                         } else {
107                                 deadc++;
108                         }
109                 }
110         }
111
112         for (int i = 0; i < 10; i++) {
113                 System.out.println(i + "    " + types[i]);
114         }
115         System.out.println("Live count:   " + livec);
116         System.out.println("Live Slot count:   " + liveslo);
117
118         System.out.println("Dead count:   " + deadc);
119         System.out.println("Old:   " + o);
120         System.out.println("New:   " + n);
121         System.out.println("Size:   " + buffer.size());
122         // System.out.println("Commits:   " + liveCommitsTable.size());
123         System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
124         System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
125
126         for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
127                 System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
128         }
129
130
131         for (Long a : liveCommitsTable.keySet()) {
132                 for (Long b : liveCommitsTable.get(a).keySet()) {
133                         for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
134                                 System.out.print(kv + " ");
135                         }
136                         System.out.print("|| ");
137                 }
138                 System.out.println();
139         }
140
141 }
142
143 /**
144  * Initialize the table by inserting a table status as the first entry into the table status
145  * also initialize the crypto stuff.
146  */
147 synchronized void Table::initTable() {
148         cloud.initSecurity();
149
150         // Create the first insertion into the block chain which is the table status
151         Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
152         localSequenceNumber++;
153         TableStatus status = new TableStatus(s, numberOfSlots);
154         s.addEntry(status);
155         Slot[] array = cloud.putSlot(s, numberOfSlots);
156
157         if (array == NULL) {
158                 array = new Slot[] {s};
159                 // update local block chain
160                 validateAndUpdate(array, true);
161         } else if (array.length == 1) {
162                 // in case we did push the slot BUT we failed to init it
163                 validateAndUpdate(array, true);
164         } else {
165                 throw new Error("Error on initialization");
166         }
167 }
168
169 /**
170  * Rebuild the table from scratch by pulling the latest block chain from the server.
171  */
172 synchronized void Table::rebuild() {
173         // Just pull the latest slots from the server
174         Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
175         validateAndUpdate(newslots, true);
176         sendToServer(NULL);
177         updateLiveTransactionsAndStatus();
178
179 }
180
181 // String toString() {
182 //  String retString = " Committed Table: \n";
183 //  retString += "---------------------------\n";
184 //  retString += commitedTable.toString();
185
186 //  retString += "\n\n";
187
188 //  retString += " Speculative Table: \n";
189 //  retString += "---------------------------\n";
190 //  retString += speculativeTable.toString();
191
192 //  return retString;
193 // }
194
195 synchronized void Table::addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
196         localCommunicationTable.put(arbitrator, new Pair<String, int32_t>(hostName, portNumber));
197 }
198
199 synchronized Long Table::getArbitrator(IoTString key) {
200         return arbitratorTable.get(key);
201 }
202
203 synchronized void Table::close() {
204         cloud.close();
205 }
206
207 synchronized IoTString Table::getCommitted(IoTString key)  {
208         KeyValue kv = committedKeyValueTable.get(key);
209
210         if (kv != NULL) {
211                 return kv.getValue();
212         } else {
213                 return NULL;
214         }
215 }
216
217 synchronized IoTString Table::getSpeculative(IoTString key) {
218         KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
219
220         if (kv == NULL) {
221                 kv = speculatedKeyValueTable.get(key);
222         }
223
224         if (kv == NULL) {
225                 kv = committedKeyValueTable.get(key);
226         }
227
228         if (kv != NULL) {
229                 return kv.getValue();
230         } else {
231                 return NULL;
232         }
233 }
234
235 synchronized IoTString Table::getCommittedAtomic(IoTString key) {
236         KeyValue kv = committedKeyValueTable.get(key);
237
238         if (arbitratorTable.get(key) == NULL) {
239                 throw new Error("Key not Found.");
240         }
241
242         // Make sure new key value pair matches the current arbitrator
243         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
244                 // TODO: Maybe not throw en error
245                 throw new Error("Not all Key Values Match Arbitrator.");
246         }
247
248         if (kv != NULL) {
249                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
250                 return kv.getValue();
251         } else {
252                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
253                 return NULL;
254         }
255 }
256
257 synchronized IoTString Table::getSpeculativeAtomic(IoTString key) {
258         if (arbitratorTable.get(key) == NULL) {
259                 throw new Error("Key not Found.");
260         }
261
262         // Make sure new key value pair matches the current arbitrator
263         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
264                 // TODO: Maybe not throw en error
265                 throw new Error("Not all Key Values Match Arbitrator.");
266         }
267
268         KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
269
270         if (kv == NULL) {
271                 kv = speculatedKeyValueTable.get(key);
272         }
273
274         if (kv == NULL) {
275                 kv = committedKeyValueTable.get(key);
276         }
277
278         if (kv != NULL) {
279                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
280                 return kv.getValue();
281         } else {
282                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
283                 return NULL;
284         }
285 }
286
287 synchronized bool Table::update()  {
288         try {
289                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
290                 validateAndUpdate(newSlots, false);
291                 sendToServer(NULL);
292
293
294                 updateLiveTransactionsAndStatus();
295
296                 return true;
297         } catch (Exception e) {
298                 // e.printStackTrace();
299
300                 for (Long m : localCommunicationTable.keySet()) {
301                         updateFromLocal(m);
302                 }
303         }
304
305         return false;
306 }
307
308 synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) {
309         while (true) {
310                 if (arbitratorTable.get(keyName) != NULL) {
311                         // There is already an arbitrator
312                         return false;
313                 }
314
315                 NewKey newKey = new NewKey(NULL, keyName, machineId);
316
317                 if (sendToServer(newKey)) {
318                         // If successfully inserted
319                         return true;
320                 }
321         }
322 }
323
324 synchronized void Table::startTransaction() {
325         // Create a new transaction, invalidates any old pending transactions.
326         pendingTransactionBuilder = new PendingTransaction(localMachineId);
327 }
328
329 synchronized void Table::addKV(IoTString key, IoTString value) {
330
331         // Make sure it is a valid key
332         if (arbitratorTable.get(key) == NULL) {
333                 throw new Error("Key not Found.");
334         }
335
336         // Make sure new key value pair matches the current arbitrator
337         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
338                 // TODO: Maybe not throw en error
339                 throw new Error("Not all Key Values Match Arbitrator.");
340         }
341
342         // Add the key value to this transaction
343         KeyValue kv = new KeyValue(key, value);
344         pendingTransactionBuilder.addKV(kv);
345 }
346
347 synchronized TransactionStatus Table::commitTransaction() {
348
349         if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
350                 // transaction with no updates will have no effect on the system
351                 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
352         }
353
354         // Set the local transaction sequence number and increment
355         pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
356         localTransactionSequenceNumber++;
357
358         // Create the transaction status
359         TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
360
361         // Create the new transaction
362         Transaction newTransaction = pendingTransactionBuilder.createTransaction();
363         newTransaction.setTransactionStatus(transactionStatus);
364
365         if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
366                 // Add it to the queue and invalidate the builder for safety
367                 pendingTransactionQueue.add(newTransaction);
368         } else {
369                 arbitrateOnLocalTransaction(newTransaction);
370                 updateLiveStateFromLocal();
371         }
372
373         pendingTransactionBuilder = new PendingTransaction(localMachineId);
374
375         try {
376                 sendToServer(NULL);
377         } catch (ServerException e) {
378
379                 Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
380                 for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
381                         Transaction transaction = iter.next();
382
383                         if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
384                                 // Already contacted this client so ignore all attempts to contact this client
385                                 // to preserve ordering for arbitrator
386                                 continue;
387                         }
388
389                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
390
391                         if (sendReturn.getFirst()) {
392                                 // Failed to contact over local
393                                 arbitratorTriedAndFailed.add(transaction.getArbitrator());
394                         } else {
395                                 // Successful contact or should not contact
396
397                                 if (sendReturn.getSecond()) {
398                                         // did arbitrate
399                                         iter.remove();
400                                 }
401                         }
402                 }
403         }
404
405         updateLiveStateFromLocal();
406
407         return transactionStatus;
408 }
409
410 /**
411  * Get the machine ID for this client
412  */
413 int64_t Table::getMachineId() {
414         return localMachineId;
415 }
416
417 /**
418  * Decrement the number of live slots that we currently have
419  */
420 void Table::decrementLiveCount() {
421         liveSlotCount--;
422 }
423
424 /**
425  * Recalculate the new resize threshold
426  */
427 void Table::setResizeThreshold() {
428         int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
429         bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
430 }
431
432 int64_t Table::getLocalSequenceNumber() {
433         return localSequenceNumber;
434 }
435
436
437 bool lastInsertedNewKey = false;
438
439 bool Table::sendToServer(NewKey newKey) {
440
441         bool fromRetry = false;
442
443         try {
444                 if (hadPartialSendToServer) {
445                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
446                         if (newSlots.length == 0) {
447                                 fromRetry = true;
448                                 ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
449
450                                 if (sendSlotsReturn.getFirst()) {
451                                         if (newKey != NULL) {
452                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
453                                                         newKey = NULL;
454                                                 }
455                                         }
456
457                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
458                                                 transaction.resetServerFailure();
459
460                                                 // Update which transactions parts still need to be sent
461                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
462
463                                                 // Add the transaction status to the outstanding list
464                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
465
466                                                 // Update the transaction status
467                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
468
469                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
470                                                 if (transaction.didSendAllParts()) {
471                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
472                                                         pendingTransactionQueue.remove(transaction);
473                                                 }
474                                         }
475                                 } else {
476
477                                         newSlots = sendSlotsReturn.getThird();
478
479                                         bool isInserted = false;
480                                         for (Slot s : newSlots) {
481                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
482                                                         isInserted = true;
483                                                         break;
484                                                 }
485                                         }
486
487                                         for (Slot s : newSlots) {
488                                                 if (isInserted) {
489                                                         break;
490                                                 }
491
492                                                 // Process each entry in the slot
493                                                 for (Entry entry : s.getEntries()) {
494
495                                                         if (entry.getType() == Entry.TypeLastMessage) {
496                                                                 LastMessage lastMessage = (LastMessage)entry;
497                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
498                                                                         isInserted = true;
499                                                                         break;
500                                                                 }
501                                                         }
502                                                 }
503                                         }
504
505                                         if (isInserted) {
506                                                 if (newKey != NULL) {
507                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
508                                                                 newKey = NULL;
509                                                         }
510                                                 }
511
512                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
513                                                         transaction.resetServerFailure();
514
515                                                         // Update which transactions parts still need to be sent
516                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
517
518                                                         // Add the transaction status to the outstanding list
519                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
520
521                                                         // Update the transaction status
522                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
523
524                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
525                                                         if (transaction.didSendAllParts()) {
526                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
527                                                                 pendingTransactionQueue.remove(transaction);
528                                                         } else {
529                                                                 transaction.resetServerFailure();
530                                                                 // Set the transaction sequence number back to nothing
531                                                                 if (!transaction.didSendAPartToServer()) {
532                                                                         transaction.setSequenceNumber(-1);
533                                                                 }
534                                                         }
535                                                 }
536                                         }
537                                 }
538
539                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
540                                         transaction.resetServerFailure();
541                                         // Set the transaction sequence number back to nothing
542                                         if (!transaction.didSendAPartToServer()) {
543                                                 transaction.setSequenceNumber(-1);
544                                         }
545                                 }
546
547                                 if (sendSlotsReturn.getThird().length != 0) {
548                                         // insert into the local block chain
549                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
550                                 }
551                                 // continue;
552                         } else {
553                                 bool isInserted = false;
554                                 for (Slot s : newSlots) {
555                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
556                                                 isInserted = true;
557                                                 break;
558                                         }
559                                 }
560
561                                 for (Slot s : newSlots) {
562                                         if (isInserted) {
563                                                 break;
564                                         }
565
566                                         // Process each entry in the slot
567                                         for (Entry entry : s.getEntries()) {
568
569                                                 if (entry.getType() == Entry.TypeLastMessage) {
570                                                         LastMessage lastMessage = (LastMessage)entry;
571                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
572                                                                 isInserted = true;
573                                                                 break;
574                                                         }
575                                                 }
576                                         }
577                                 }
578
579                                 if (isInserted) {
580                                         if (newKey != NULL) {
581                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
582                                                         newKey = NULL;
583                                                 }
584                                         }
585
586                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
587                                                 transaction.resetServerFailure();
588
589                                                 // Update which transactions parts still need to be sent
590                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
591
592                                                 // Add the transaction status to the outstanding list
593                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
594
595                                                 // Update the transaction status
596                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
597
598                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
599                                                 if (transaction.didSendAllParts()) {
600                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
601                                                         pendingTransactionQueue.remove(transaction);
602                                                 } else {
603                                                         transaction.resetServerFailure();
604                                                         // Set the transaction sequence number back to nothing
605                                                         if (!transaction.didSendAPartToServer()) {
606                                                                 transaction.setSequenceNumber(-1);
607                                                         }
608                                                 }
609                                         }
610                                 } else {
611                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
612                                                 transaction.resetServerFailure();
613                                                 // Set the transaction sequence number back to nothing
614                                                 if (!transaction.didSendAPartToServer()) {
615                                                         transaction.setSequenceNumber(-1);
616                                                 }
617                                         }
618                                 }
619
620                                 // insert into the local block chain
621                                 validateAndUpdate(newSlots, true);
622                         }
623                 }
624         } catch (ServerException e) {
625                 throw e;
626         }
627
628
629
630         try {
631                 // While we have stuff that needs inserting into the block chain
632                 while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) {
633
634                         fromRetry = false;
635
636                         if (hadPartialSendToServer) {
637                                 throw new Error("Should Be error free");
638                         }
639
640
641
642                         // If there is a new key with same name then end
643                         if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) {
644                                 return false;
645                         }
646
647                         // Create the slot
648                         Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
649                         localSequenceNumber++;
650
651                         // Try to fill the slot with data
652                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
653                         bool needsResize = fillSlotsReturn.getFirst();
654                         int newSize = fillSlotsReturn.getSecond();
655                         bool insertedNewKey = fillSlotsReturn.getThird();
656
657                         if (needsResize) {
658                                 // Reset which transaction to send
659                                 for (Transaction transaction : transactionPartsSent.keySet()) {
660                                         transaction.resetNextPartToSend();
661
662                                         // Set the transaction sequence number back to nothing
663                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
664                                                 transaction.setSequenceNumber(-1);
665                                         }
666                                 }
667
668                                 // Clear the sent data since we are trying again
669                                 pendingSendArbitrationEntriesToDelete.clear();
670                                 transactionPartsSent.clear();
671
672                                 // We needed a resize so try again
673                                 fillSlot(slot, true, newKey);
674                         }
675
676                         lastSlotAttemptedToSend = slot;
677                         lastIsNewKey = (newKey != NULL);
678                         lastInsertedNewKey = insertedNewKey;
679                         lastNewSize = newSize;
680                         lastNewKey = newKey;
681                         lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
682                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
683
684
685                         ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
686
687                         if (sendSlotsReturn.getFirst()) {
688
689                                 // Did insert into the block chain
690
691                                 if (insertedNewKey) {
692                                         // This slot was what was inserted not a previous slot
693
694                                         // New Key was successfully inserted into the block chain so dont want to insert it again
695                                         newKey = NULL;
696                                 }
697
698                                 // Remove the aborts and commit parts that were sent from the pending to send queue
699                                 for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
700                                         ArbitrationRound round = iter.next();
701                                         round.removeParts(pendingSendArbitrationEntriesToDelete);
702
703                                         if (round.isDoneSending()) {
704                                                 // Sent all the parts
705                                                 iter.remove();
706                                         }
707                                 }
708
709                                 for (Transaction transaction : transactionPartsSent.keySet()) {
710                                         transaction.resetServerFailure();
711
712                                         // Update which transactions parts still need to be sent
713                                         transaction.removeSentParts(transactionPartsSent.get(transaction));
714
715                                         // Add the transaction status to the outstanding list
716                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
717
718                                         // Update the transaction status
719                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
720
721                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
722                                         if (transaction.didSendAllParts()) {
723                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
724                                                 pendingTransactionQueue.remove(transaction);
725                                         }
726                                 }
727                         } else {
728
729                                 // if (!sendSlotsReturn.getSecond()) {
730                                 //  for (Transaction transaction : lastTransactionPartsSent.keySet()) {
731                                 //    transaction.resetServerFailure();
732                                 //  }
733                                 // } else {
734                                 //  for (Transaction transaction : lastTransactionPartsSent.keySet()) {
735                                 //    transaction.resetServerFailure();
736
737                                 //    // Update which transactions parts still need to be sent
738                                 //    transaction.removeSentParts(transactionPartsSent.get(transaction));
739
740                                 //    // Add the transaction status to the outstanding list
741                                 //    outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
742
743                                 //    // Update the transaction status
744                                 //    transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
745
746                                 //    // Check if all the transaction parts were successfully sent and if so then remove it from pending
747                                 //    if (transaction.didSendAllParts()) {
748                                 //      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
749                                 //      pendingTransactionQueue.remove(transaction);
750
751                                 //      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
752                                 //        System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
753                                 //      }
754                                 //    }
755                                 //  }
756                                 // }
757
758                                 // Reset which transaction to send
759                                 for (Transaction transaction : transactionPartsSent.keySet()) {
760                                         transaction.resetNextPartToSend();
761                                         // transaction.resetNextPartToSend();
762
763                                         // Set the transaction sequence number back to nothing
764                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
765                                                 transaction.setSequenceNumber(-1);
766                                         }
767                                 }
768                         }
769
770                         // Clear the sent data in preparation for next send
771                         pendingSendArbitrationEntriesToDelete.clear();
772                         transactionPartsSent.clear();
773
774                         if (sendSlotsReturn.getThird().length != 0) {
775                                 // insert into the local block chain
776                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
777                         }
778                 }
779
780         } catch (ServerException e) {
781
782                 if (e.getType() != ServerException.TypeInputTimeout) {
783                         // e.printStackTrace();
784
785                         // Nothing was able to be sent to the server so just clear these data structures
786                         for (Transaction transaction : transactionPartsSent.keySet()) {
787                                 transaction.resetNextPartToSend();
788
789                                 // Set the transaction sequence number back to nothing
790                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
791                                         transaction.setSequenceNumber(-1);
792                                 }
793                         }
794                 } else {
795                         // There was a partial send to the server
796                         hadPartialSendToServer = true;
797
798
799                         // if (!fromRetry) {
800                         //  lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
801                         //  lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
802                         // }
803
804                         // Nothing was able to be sent to the server so just clear these data structures
805                         for (Transaction transaction : transactionPartsSent.keySet()) {
806                                 transaction.resetNextPartToSend();
807                                 transaction.setServerFailure();
808                         }
809                 }
810
811                 pendingSendArbitrationEntriesToDelete.clear();
812                 transactionPartsSent.clear();
813
814                 throw e;
815         }
816
817         return newKey == NULL;
818 }
819
820 synchronized bool Table::updateFromLocal(int64_t machineId) {
821         Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(machineId);
822         if (localCommunicationInformation == NULL) {
823                 // Cant talk to that device locally so do nothing
824                 return false;
825         }
826
827         // Get the size of the send data
828         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
829
830         Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
831         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
832                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
833         }
834
835         Array<char> *sendData = new char[sendDataSize];
836         ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
837
838         // Encode the data
839         bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
840         bbEncode.putInt(0);
841
842         // Send by local
843         Array<char> *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
844         localSequenceNumber++;
845
846         if (returnData == NULL) {
847                 // Could not contact server
848                 return false;
849         }
850
851         // Decode the data
852         ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
853         int numberOfEntries = bbDecode.getInt();
854
855         for (int i = 0; i < numberOfEntries; i++) {
856                 char type = bbDecode.get();
857                 if (type == Entry.TypeAbort) {
858                         Abort abort = (Abort)Abort.decode(NULL, bbDecode);
859                         processEntry(abort);
860                 } else if (type == Entry.TypeCommitPart) {
861                         CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
862                         processEntry(commitPart);
863                 }
864         }
865
866         updateLiveStateFromLocal();
867
868         return true;
869 }
870
871 Pair<bool, bool> Table::sendTransactionToLocal(Transaction transaction) {
872
873         // Get the devices local communications
874         Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
875
876         if (localCommunicationInformation == NULL) {
877                 // Cant talk to that device locally so do nothing
878                 return new Pair<bool, bool>(true, false);
879         }
880
881         // Get the size of the send data
882         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
883         for (TransactionPart part : transaction.getParts().values()) {
884                 sendDataSize += part.getSize();
885         }
886
887         Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
888         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
889                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
890         }
891
892         // Make the send data size
893         Array<char> *sendData = new char[sendDataSize];
894         ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
895
896         // Encode the data
897         bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
898         bbEncode.putInt(transaction.getParts().size());
899         for (TransactionPart part : transaction.getParts().values()) {
900                 part.encode(bbEncode);
901         }
902
903
904         // Send by local
905         Array<char> *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
906         localSequenceNumber++;
907
908         if (returnData == NULL) {
909                 // Could not contact server
910                 return new Pair<bool, bool>(true, false);
911         }
912
913         // Decode the data
914         ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
915         bool didCommit = bbDecode.get() == 1;
916         bool couldArbitrate = bbDecode.get() == 1;
917         int numberOfEntries = bbDecode.getInt();
918         bool foundAbort = false;
919
920         for (int i = 0; i < numberOfEntries; i++) {
921                 char type = bbDecode.get();
922                 if (type == Entry.TypeAbort) {
923                         Abort abort = (Abort)Abort.decode(NULL, bbDecode);
924
925                         if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
926                                 foundAbort = true;
927                         }
928
929                         processEntry(abort);
930                 } else if (type == Entry.TypeCommitPart) {
931                         CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
932                         processEntry(commitPart);
933                 }
934         }
935
936         updateLiveStateFromLocal();
937
938         if (couldArbitrate) {
939                 TransactionStatus status =  transaction.getTransactionStatus();
940                 if (didCommit) {
941                         status.setStatus(TransactionStatus.StatusCommitted);
942                 } else {
943                         status.setStatus(TransactionStatus.StatusAborted);
944                 }
945         } else {
946                 TransactionStatus status =  transaction.getTransactionStatus();
947                 if (foundAbort) {
948                         status.setStatus(TransactionStatus.StatusAborted);
949                 } else {
950                         status.setStatus(TransactionStatus.StatusCommitted);
951                 }
952         }
953
954         return new Pair<bool, bool>(false, true);
955 }
956
957 synchronized Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
958
959         // Decode the data
960         ByteBuffer bbDecode = ByteBuffer.wrap(data);
961         int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong();
962         int numberOfParts = bbDecode.getInt();
963
964         // If we did commit a transaction or not
965         bool didCommit = false;
966         bool couldArbitrate = false;
967
968         if (numberOfParts != 0) {
969
970                 // decode the transaction
971                 Transaction transaction = new Transaction();
972                 for (int i = 0; i < numberOfParts; i++) {
973                         bbDecode.get();
974                         TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
975                         transaction.addPartDecode(newPart);
976                 }
977
978                 // Arbitrate on transaction and pull relevant return data
979                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
980                 couldArbitrate = localArbitrateReturn.getFirst();
981                 didCommit = localArbitrateReturn.getSecond();
982
983                 updateLiveStateFromLocal();
984
985                 // Transaction was sent to the server so keep track of it to prevent double commit
986                 if (transaction.getSequenceNumber() != -1) {
987                         offlineTransactionsCommittedAndAtServer.add(transaction.getId());
988                 }
989         }
990
991         // The data to send back
992         int returnDataSize = 0;
993         Vector<Entry> unseenArbitrations = new Vector<Entry>();
994
995         // Get the aborts to send back
996         Vector<Long> abortLocalSequenceNumbers = new Vector<Long >(liveAbortsGeneratedByLocal.keySet());
997         Collections.sort(abortLocalSequenceNumbers);
998         for (Long localSequenceNumber : abortLocalSequenceNumbers) {
999                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1000                         continue;
1001                 }
1002
1003                 Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1004                 unseenArbitrations.add(abort);
1005                 returnDataSize += abort.getSize();
1006         }
1007
1008         // Get the commits to send back
1009         Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1010         if (commitForClientTable != NULL) {
1011                 Vector<Long> commitLocalSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
1012                 Collections.sort(commitLocalSequenceNumbers);
1013
1014                 for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1015                         Commit commit = commitForClientTable.get(localSequenceNumber);
1016
1017                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1018                                 continue;
1019                         }
1020
1021                         unseenArbitrations.addAll(commit.getParts().values());
1022
1023                         for (CommitPart commitPart : commit.getParts().values()) {
1024                                 returnDataSize += commitPart.getSize();
1025                         }
1026                 }
1027         }
1028
1029         // Number of arbitration entries to decode
1030         returnDataSize += 2 * sizeof(int32_t);
1031
1032         // bool of did commit or not
1033         if (numberOfParts != 0) {
1034                 returnDataSize += sizeof(char);
1035         }
1036
1037         // Data to send Back
1038         Array<char> *returnData = new char[returnDataSize];
1039         ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1040
1041         if (numberOfParts != 0) {
1042                 if (didCommit) {
1043                         bbEncode.put((char)1);
1044                 } else {
1045                         bbEncode.put((char)0);
1046                 }
1047                 if (couldArbitrate) {
1048                         bbEncode.put((char)1);
1049                 } else {
1050                         bbEncode.put((char)0);
1051                 }
1052         }
1053
1054         bbEncode.putInt(unseenArbitrations.size());
1055         for (Entry entry : unseenArbitrations) {
1056                 entry.encode(bbEncode);
1057         }
1058
1059
1060         localSequenceNumber++;
1061         return returnData;
1062 }
1063
1064 ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
1065         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1066         attemptedToSendToServer = true;
1067
1068         bool inserted = false;
1069         bool lastTryInserted = false;
1070
1071         Slot[] array = cloud.putSlot(slot, newSize);
1072         if (array == NULL) {
1073                 array = new Slot[] {slot};
1074                 rejectedSlotVector.clear();
1075                 inserted = true;
1076         } else {
1077                 if (array.length == 0) {
1078                         throw new Error("Server Error: Did not send any slots");
1079                 }
1080
1081                 // if (attemptedToSendToServerTmp) {
1082                 if (hadPartialSendToServer) {
1083
1084                         bool isInserted = false;
1085                         for (Slot s : array) {
1086                                 if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1087                                         isInserted = true;
1088                                         break;
1089                                 }
1090                         }
1091
1092                         for (Slot s : array) {
1093                                 if (isInserted) {
1094                                         break;
1095                                 }
1096
1097                                 // Process each entry in the slot
1098                                 for (Entry entry : s.getEntries()) {
1099
1100                                         if (entry.getType() == Entry.TypeLastMessage) {
1101                                                 LastMessage lastMessage = (LastMessage)entry;
1102
1103                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1104                                                         isInserted = true;
1105                                                         break;
1106                                                 }
1107                                         }
1108                                 }
1109                         }
1110
1111                         if (!isInserted) {
1112                                 rejectedSlotVector.add(slot.getSequenceNumber());
1113                                 lastTryInserted = false;
1114                         } else {
1115                                 lastTryInserted = true;
1116                         }
1117                 } else {
1118                         rejectedSlotVector.add(slot.getSequenceNumber());
1119                         lastTryInserted = false;
1120                 }
1121         }
1122
1123         return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
1124 }
1125
1126 /**
1127  * Returns false if a resize was needed
1128  */
1129 ThreeTuple<bool, int32_t, bool> *Table::fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
1130
1131
1132         int newSize = 0;
1133         if (liveSlotCount > bufferResizeThreshold) {
1134                 resize = true;  //Resize is forced
1135
1136         }
1137
1138         if (resize) {
1139                 newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1140                 TableStatus status = new TableStatus(slot, newSize);
1141                 slot.addEntry(status);
1142         }
1143
1144         // Fill with rejected slots first before doing anything else
1145         doRejectedMessages(slot);
1146
1147         // Do mandatory rescue of entries
1148         ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1149
1150         // Extract working variables
1151         bool needsResize = mandatoryRescueReturn.getFirst();
1152         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1153         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1154
1155         if (needsResize && !resize) {
1156                 // We need to resize but we are not resizing so return false
1157                 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1158         }
1159
1160         bool inserted = false;
1161         if (newKeyEntry != NULL) {
1162                 newKeyEntry.setSlot(slot);
1163                 if (slot.hasSpace(newKeyEntry)) {
1164
1165                         slot.addEntry(newKeyEntry);
1166                         inserted = true;
1167                 }
1168         }
1169
1170         // Clear the transactions, aborts and commits that were sent previously
1171         transactionPartsSent.clear();
1172         pendingSendArbitrationEntriesToDelete.clear();
1173
1174         for (ArbitrationRound round : pendingSendArbitrationRounds) {
1175                 bool isFull = false;
1176                 round.generateParts();
1177                 Vector<Entry> parts = round.getParts();
1178
1179                 // Insert pending arbitration data
1180                 for (Entry arbitrationData : parts) {
1181
1182                         // If it is an abort then we need to set some information
1183                         if (arbitrationData instanceof Abort) {
1184                                 ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1185                         }
1186
1187                         if (!slot.hasSpace(arbitrationData)) {
1188                                 // No space so cant do anything else with these data entries
1189                                 isFull = true;
1190                                 break;
1191                         }
1192
1193                         // Add to this current slot and add it to entries to delete
1194                         slot.addEntry(arbitrationData);
1195                         pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1196                 }
1197
1198                 if (isFull) {
1199                         break;
1200                 }
1201         }
1202
1203         if (pendingTransactionQueue.size() > 0) {
1204
1205                 Transaction transaction = pendingTransactionQueue.get(0);
1206
1207                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1208                 // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1209                 //  transaction.setSequenceNumber(slot.getSequenceNumber());
1210                 // }
1211
1212                 if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1213                         transaction.setSequenceNumber(slot.getSequenceNumber());
1214                 }
1215
1216
1217                 while (true) {
1218                         TransactionPart part = transaction.getNextPartToSend();
1219
1220                         if (part == NULL) {
1221                                 // Ran out of parts to send for this transaction so move on
1222                                 break;
1223                         }
1224
1225                         if (slot.hasSpace(part)) {
1226                                 slot.addEntry(part);
1227                                 Vector<int32_t> partsSent = transactionPartsSent.get(transaction);
1228                                 if (partsSent == NULL) {
1229                                         partsSent = new Vector<int32_t>();
1230                                         transactionPartsSent.put(transaction, partsSent);
1231                                 }
1232                                 partsSent.add(part.getPartNumber());
1233                                 transactionPartsSent.put(transaction, partsSent);
1234                         } else {
1235                                 break;
1236                         }
1237                 }
1238         }
1239
1240         // Fill the remainder of the slot with rescue data
1241         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1242
1243         return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1244 }
1245
1246 void Table::doRejectedMessages(Slot s) {
1247         if (!rejectedSlotVector.isEmpty()) {
1248                 /* TODO: We should avoid generating a rejected message entry if
1249                  * there is already a sufficient entry in the queue (e.g.,
1250                  * equalsto value of true and same sequence number).  */
1251
1252                 int64_t old_seqn = rejectedSlotVector.firstElement();
1253                 if (rejectedSlotVector.size() > REJECTED_THRESHOLD) {
1254                         int64_t new_seqn = rejectedSlotVector.lastElement();
1255                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1256                         s.addEntry(rm);
1257                 } else {
1258                         int64_t prev_seqn = -1;
1259                         int i = 0;
1260                         /* Go through list of missing messages */
1261                         for (; i < rejectedSlotVector.size(); i++) {
1262                                 int64_t curr_seqn = rejectedSlotVector.get(i);
1263                                 Slot s_msg = buffer.getSlot(curr_seqn);
1264                                 if (s_msg != NULL)
1265                                         break;
1266                                 prev_seqn = curr_seqn;
1267                         }
1268                         /* Generate rejected message entry for missing messages */
1269                         if (prev_seqn != -1) {
1270                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1271                                 s.addEntry(rm);
1272                         }
1273                         /* Generate rejected message entries for present messages */
1274                         for (; i < rejectedSlotVector.size(); i++) {
1275                                 int64_t curr_seqn = rejectedSlotVector.get(i);
1276                                 Slot s_msg = buffer.getSlot(curr_seqn);
1277                                 int64_t machineid = s_msg.getMachineID();
1278                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1279                                 s.addEntry(rm);
1280                         }
1281                 }
1282         }
1283 }
1284
1285 ThreeTuple<bool, bool, Long> Table::doMandatoryResuce(Slot slot, bool resize) {
1286         int64_t newestSequenceNumber = buffer.getNewestSeqNum();
1287         int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
1288         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1289                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1290         }
1291
1292         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1293         bool seenLiveSlot = false;
1294         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1295         int64_t threshold = firstIfFull + FREE_SLOTS;           // we want the buffer to be clear of live entries up to this point
1296
1297
1298         // Mandatory Rescue
1299         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1300                 Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1301                 // Push slot number forward
1302                 if (!seenLiveSlot) {
1303                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1304                 }
1305
1306                 if (!previousSlot.isLive()) {
1307                         continue;
1308                 }
1309
1310                 // We have seen a live slot
1311                 seenLiveSlot = true;
1312
1313                 // Get all the live entries for a slot
1314                 Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1315
1316                 // Iterate over all the live entries and try to rescue them
1317                 for (Entry liveEntry : liveEntries) {
1318                         if (slot.hasSpace(liveEntry)) {
1319
1320                                 // Enough space to rescue the entry
1321                                 slot.addEntry(liveEntry);
1322                         } else if (currentSequenceNumber == firstIfFull) {
1323                                 //if there's no space but the entry is about to fall off the queue
1324                                 System.out.println("B");        //?
1325                                 return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
1326
1327                         }
1328                 }
1329         }
1330
1331         // Did not resize
1332         return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
1333 }
1334
1335 void Table::doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
1336         /* now go through live entries from least to greatest sequence number until
1337          * either all live slots added, or the slot doesn't have enough room
1338          * for SKIP_THRESHOLD consecutive entries*/
1339         int skipcount = 0;
1340         int64_t newestseqnum = buffer.getNewestSeqNum();
1341 search:
1342         for (; seqn <= newestseqnum; seqn++) {
1343                 Slot prevslot = buffer.getSlot(seqn);
1344                 //Push slot number forward
1345                 if (!seenliveslot)
1346                         oldestLiveSlotSequenceNumver = seqn;
1347
1348                 if (!prevslot.isLive())
1349                         continue;
1350                 seenliveslot = true;
1351                 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1352                 for (Entry liveentry : liveentries) {
1353                         if (s.hasSpace(liveentry))
1354                                 s.addEntry(liveentry);
1355                         else {
1356                                 skipcount++;
1357                                 if (skipcount > SKIP_THRESHOLD)
1358                                         break search;
1359                         }
1360                 }
1361         }
1362 }
1363
1364 /**
1365  * Checks for malicious activity and updates the local copy of the block chain.
1366  */
1367 void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
1368
1369         // The cloud communication layer has checked slot HMACs already before decoding
1370         if (newSlots.length == 0) {
1371                 return;
1372         }
1373
1374         // Make sure all slots are newer than the last largest slot this client has seen
1375         int64_t firstSeqNum = newSlots[0].getSequenceNumber();
1376         if (firstSeqNum <= sequenceNumber) {
1377                 throw new Error("Server Error: Sent older slots!");
1378         }
1379
1380         // Create an object that can access both new slots and slots in our local chain
1381         // without committing slots to our local chain
1382         SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1383
1384         // Check that the HMAC chain is not broken
1385         checkHMACChain(indexer, newSlots);
1386
1387         // Set to keep track of messages from clients
1388         HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1389
1390         // Process each slots data
1391         for (Slot slot : newSlots) {
1392                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1393
1394                 updateExpectedSize();
1395         }
1396
1397         // If there is a gap, check to see if the server sent us everything.
1398         if (firstSeqNum != (sequenceNumber + 1)) {
1399
1400                 // Check the size of the slots that were sent down by the server.
1401                 // Can only check the size if there was a gap
1402                 checkNumSlots(newSlots.length);
1403
1404                 // Since there was a gap every machine must have pushed a slot or must have
1405                 // a last message message.  If not then the server is hiding slots
1406                 if (!machineSet.isEmpty()) {
1407                         throw new Error("Missing record for machines: " + machineSet);
1408                 }
1409         }
1410
1411         // Update the size of our local block chain.
1412         commitNewMaxSize();
1413
1414         // Commit new to slots to the local block chain.
1415         for (Slot slot : newSlots) {
1416
1417                 // Insert this slot into our local block chain copy.
1418                 buffer.putSlot(slot);
1419
1420                 // Keep track of how many slots are currently live (have live data in them).
1421                 liveSlotCount++;
1422         }
1423
1424         // Get the sequence number of the latest slot in the system
1425         sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1426
1427         updateLiveStateFromServer();
1428
1429         // No Need to remember after we pulled from the server
1430         offlineTransactionsCommittedAndAtServer.clear();
1431
1432         // This is invalidated now
1433         hadPartialSendToServer = false;
1434 }
1435
1436 void Table::updateLiveStateFromServer() {
1437         // Process the new transaction parts
1438         processNewTransactionParts();
1439
1440         // Do arbitration on new transactions that were received
1441         arbitrateFromServer();
1442
1443         // Update all the committed keys
1444         bool didCommitOrSpeculate = updateCommittedTable();
1445
1446         // Delete the transactions that are now dead
1447         updateLiveTransactionsAndStatus();
1448
1449         // Do speculations
1450         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1451         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1452 }
1453
1454 void Table::updateLiveStateFromLocal() {
1455         // Update all the committed keys
1456         bool didCommitOrSpeculate = updateCommittedTable();
1457
1458         // Delete the transactions that are now dead
1459         updateLiveTransactionsAndStatus();
1460
1461         // Do speculations
1462         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1463         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1464 }
1465
1466 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1467         // if (didFindTableStatus) {
1468         // return;
1469         // }
1470         int64_t prevslots = firstSequenceNumber;
1471
1472
1473         if (didFindTableStatus) {
1474                 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1475                 // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1476
1477         } else {
1478                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1479                 // System.out.println("Here: " + expectedsize);
1480         }
1481
1482         // System.out.println(numberOfSlots);
1483
1484         didFindTableStatus = true;
1485         currMaxSize = numberOfSlots;
1486 }
1487
1488 void Table::updateExpectedSize() {
1489         expectedsize++;
1490
1491         if (expectedsize > currMaxSize) {
1492                 expectedsize = currMaxSize;
1493         }
1494 }
1495
1496
1497 /**
1498  * Check the size of the block chain to make sure there are enough slots sent back by the server.
1499  * This is only called when we have a gap between the slots that we have locally and the slots
1500  * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1501  * status message
1502  */
1503 void Table::checkNumSlots(int numberOfSlots) {
1504         if (numberOfSlots != expectedsize) {
1505                 throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1506         }
1507 }
1508
1509 void Table::updateCurrMaxSize(int newmaxsize) {
1510         currMaxSize = newmaxsize;
1511 }
1512
1513
1514 /**
1515  * Update the size of of the local buffer if it is needed.
1516  */
1517 void Table::commitNewMaxSize() {
1518         didFindTableStatus = false;
1519
1520         // Resize the local slot buffer
1521         if (numberOfSlots != currMaxSize) {
1522                 buffer.resize((int)currMaxSize);
1523         }
1524
1525         // Change the number of local slots to the new size
1526         numberOfSlots = (int)currMaxSize;
1527
1528
1529         // Recalculate the resize threshold since the size of the local buffer has changed
1530         setResizeThreshold();
1531 }
1532
1533 /**
1534  * Process the new transaction parts from this latest round of slots received from the server
1535  */
1536 void Table::processNewTransactionParts() {
1537
1538         if (newTransactionParts.size() == 0) {
1539                 // Nothing new to process
1540                 return;
1541         }
1542
1543         // Iterate through all the machine Ids that we received new parts for
1544         for (Long machineId : newTransactionParts.keySet()) {
1545                 Hashtable<Pair<int64_t int32_t>, TransactionPart> parts = newTransactionParts.get(machineId);
1546
1547                 // Iterate through all the parts for that machine Id
1548                 for (Pair<int64_t int32_t> partId : parts.keySet()) {
1549                         TransactionPart part = parts.get(partId);
1550
1551                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1552                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) {
1553                                 // Set dead the transaction part
1554                                 part.setDead();
1555                                 continue;
1556                         }
1557
1558                         // Get the transaction object for that sequence number
1559                         Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1560
1561                         if (transaction == NULL) {
1562                                 // This is a new transaction that we dont have so make a new one
1563                                 transaction = new Transaction();
1564
1565                                 // Insert this new transaction into the live tables
1566                                 liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1567                                 liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1568                         }
1569
1570                         // Add that part to the transaction
1571                         transaction.addPartDecode(part);
1572                 }
1573         }
1574
1575         // Clear all the new transaction parts in preparation for the next time the server sends slots
1576         newTransactionParts.clear();
1577 }
1578
1579
1580 int64_t lastSeqNumArbOn = 0;
1581
1582 void Table::arbitrateFromServer() {
1583
1584         if (liveTransactionBySequenceNumberTable.size() == 0) {
1585                 // Nothing to arbitrate on so move on
1586                 return;
1587         }
1588
1589         // Get the transaction sequence numbers and sort from oldest to newest
1590         Vector<Long> transactionSequenceNumbers = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
1591         Collections.sort(transactionSequenceNumbers);
1592
1593         // Collection of key value pairs that are
1594         Hashtable<IoTString, KeyValue> speculativeTableTmp = new Hashtable<IoTString, KeyValue>();
1595
1596         // The last transaction arbitrated on
1597         int64_t lastTransactionCommitted = -1;
1598         Set<Abort> generatedAborts = new HashSet<Abort>();
1599
1600         for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1601                 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1602
1603
1604
1605                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1606                 if (transaction.getArbitrator() != localMachineId) {
1607                         continue;
1608                 }
1609
1610                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1611                         continue;
1612                 }
1613
1614                 if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1615                         // We have seen this already locally so dont commit again
1616                         continue;
1617                 }
1618
1619
1620                 if (!transaction.isComplete()) {
1621                         // Will arbitrate in incorrect order if we continue so just break
1622                         // Most likely this
1623                         break;
1624                 }
1625
1626
1627                 // update the largest transaction seen by arbitrator from server
1628                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) {
1629                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1630                 } else {
1631                         Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1632                         if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1633                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1634                         }
1635                 }
1636
1637                 if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1638                         // Guard evaluated as true
1639
1640                         // Update the local changes so we can make the commit
1641                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1642                                 speculativeTableTmp.put(kv.getKey(), kv);
1643                         }
1644
1645                         // Update what the last transaction committed was for use in batch commit
1646                         lastTransactionCommitted = transactionSequenceNumber;
1647                 } else {
1648                         // Guard evaluated was false so create abort
1649
1650                         // Create the abort
1651                         Abort newAbort = new Abort(NULL,
1652                                                                                                                                  transaction.getClientLocalSequenceNumber(),
1653                                                                                                                                  transaction.getSequenceNumber(),
1654                                                                                                                                  transaction.getMachineId(),
1655                                                                                                                                  transaction.getArbitrator(),
1656                                                                                                                                  localArbitrationSequenceNumber);
1657                         localArbitrationSequenceNumber++;
1658
1659                         generatedAborts.add(newAbort);
1660
1661                         // Insert the abort so we can process
1662                         processEntry(newAbort);
1663                 }
1664
1665                 lastSeqNumArbOn = transactionSequenceNumber;
1666
1667                 // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1668         }
1669
1670         Commit newCommit = NULL;
1671
1672         // If there is something to commit
1673         if (speculativeTableTmp.size() != 0) {
1674
1675                 // Create the commit and increment the commit sequence number
1676                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1677                 localArbitrationSequenceNumber++;
1678
1679                 // Add all the new keys to the commit
1680                 for (KeyValue kv : speculativeTableTmp.values()) {
1681                         newCommit.addKV(kv);
1682                 }
1683
1684                 // create the commit parts
1685                 newCommit.createCommitParts();
1686
1687                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1688
1689                 // Insert the commit so we can process it
1690                 for (CommitPart commitPart : newCommit.getParts().values()) {
1691                         processEntry(commitPart);
1692                 }
1693         }
1694
1695         if ((newCommit != NULL) || (generatedAborts.size() > 0)) {
1696                 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1697                 pendingSendArbitrationRounds.add(arbitrationRound);
1698
1699                 if (compactArbitrationData()) {
1700                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1701                         if (newArbitrationRound.getCommit() != NULL) {
1702                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1703                                         processEntry(commitPart);
1704                                 }
1705                         }
1706                 }
1707         }
1708 }
1709
1710 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
1711
1712         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1713         if (transaction.getArbitrator() != localMachineId) {
1714                 return new Pair<bool, bool>(false, false);
1715         }
1716
1717         if (!transaction.isComplete()) {
1718                 // Will arbitrate in incorrect order if we continue so just break
1719                 // Most likely this
1720                 return new Pair<bool, bool>(false, false);
1721         }
1722
1723         if (transaction.getMachineId() != localMachineId) {
1724                 // dont do this check for local transactions
1725                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
1726                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1727                                 // We've have already seen this from the server
1728                                 return new Pair<bool, bool>(false, false);
1729                         }
1730                 }
1731         }
1732
1733         if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1734                 // Guard evaluated as true
1735
1736                 // Create the commit and increment the commit sequence number
1737                 Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1738                 localArbitrationSequenceNumber++;
1739
1740                 // Update the local changes so we can make the commit
1741                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1742                         newCommit.addKV(kv);
1743                 }
1744
1745                 // create the commit parts
1746                 newCommit.createCommitParts();
1747
1748                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1749                 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1750                 pendingSendArbitrationRounds.add(arbitrationRound);
1751
1752                 if (compactArbitrationData()) {
1753                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1754                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1755                                 processEntry(commitPart);
1756                         }
1757                 } else {
1758                         // Insert the commit so we can process it
1759                         for (CommitPart commitPart : newCommit.getParts().values()) {
1760                                 processEntry(commitPart);
1761                         }
1762                 }
1763
1764                 if (transaction.getMachineId() == localMachineId) {
1765                         TransactionStatus status = transaction.getTransactionStatus();
1766                         if (status != NULL) {
1767                                 status.setStatus(TransactionStatus.StatusCommitted);
1768                         }
1769                 }
1770
1771                 updateLiveStateFromLocal();
1772                 return new Pair<bool, bool>(true, true);
1773         } else {
1774
1775                 if (transaction.getMachineId() == localMachineId) {
1776                         // For locally created messages update the status
1777
1778                         // Guard evaluated was false so create abort
1779                         TransactionStatus status = transaction.getTransactionStatus();
1780                         if (status != NULL) {
1781                                 status.setStatus(TransactionStatus.StatusAborted);
1782                         }
1783                 } else {
1784                         Set addAbortSet = new HashSet<Abort>();
1785
1786
1787                         // Create the abort
1788                         Abort newAbort = new Abort(NULL,
1789                                                                                                                                  transaction.getClientLocalSequenceNumber(),
1790                                                                                                                                  -1,
1791                                                                                                                                  transaction.getMachineId(),
1792                                                                                                                                  transaction.getArbitrator(),
1793                                                                                                                                  localArbitrationSequenceNumber);
1794                         localArbitrationSequenceNumber++;
1795
1796                         addAbortSet.add(newAbort);
1797
1798
1799                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1800                         ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1801                         pendingSendArbitrationRounds.add(arbitrationRound);
1802
1803                         if (compactArbitrationData()) {
1804                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1805                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1806                                         processEntry(commitPart);
1807                                 }
1808                         }
1809                 }
1810
1811                 updateLiveStateFromLocal();
1812                 return new Pair<bool, bool>(true, false);
1813         }
1814 }
1815
1816 /**
1817  * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1818  */
1819 bool Table::compactArbitrationData() {
1820
1821         if (pendingSendArbitrationRounds.size() < 2) {
1822                 // Nothing to compact so do nothing
1823                 return false;
1824         }
1825
1826         ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1827         if (lastRound.didSendPart()) {
1828                 return false;
1829         }
1830
1831         bool hadCommit = (lastRound.getCommit() == NULL);
1832         bool gotNewCommit = false;
1833
1834         int numberToDelete = 1;
1835         while (numberToDelete < pendingSendArbitrationRounds.size()) {
1836                 ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1837
1838                 if (round.isFull() || round.didSendPart()) {
1839                         // Stop since there is a part that cannot be compacted and we need to compact in order
1840                         break;
1841                 }
1842
1843                 if (round.getCommit() == NULL) {
1844
1845                         // Try compacting aborts only
1846                         int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1847                         if (newSize > ArbitrationRound.MAX_PARTS) {
1848                                 // Cant compact since it would be too large
1849                                 break;
1850                         }
1851                         lastRound.addAborts(round.getAborts());
1852                 } else {
1853
1854                         // Create a new larger commit
1855                         Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1856                         localArbitrationSequenceNumber++;
1857
1858                         // Create the commit parts so that we can count them
1859                         newCommit.createCommitParts();
1860
1861                         // Calculate the new size of the parts
1862                         int newSize = newCommit.getNumberOfParts();
1863                         newSize += lastRound.getAbortsCount();
1864                         newSize += round.getAbortsCount();
1865
1866                         if (newSize > ArbitrationRound.MAX_PARTS) {
1867                                 // Cant compact since it would be too large
1868                                 break;
1869                         }
1870
1871                         // Set the new compacted part
1872                         lastRound.setCommit(newCommit);
1873                         lastRound.addAborts(round.getAborts());
1874                         gotNewCommit = true;
1875                 }
1876
1877                 numberToDelete++;
1878         }
1879
1880         if (numberToDelete != 1) {
1881                 // If there is a compaction
1882
1883                 // Delete the previous pieces that are now in the new compacted piece
1884                 if (numberToDelete == pendingSendArbitrationRounds.size()) {
1885                         pendingSendArbitrationRounds.clear();
1886                 } else {
1887                         for (int i = 0; i < numberToDelete; i++) {
1888                                 pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1889                         }
1890                 }
1891
1892                 // Add the new compacted into the pending to send list
1893                 pendingSendArbitrationRounds.add(lastRound);
1894
1895                 // Should reinsert into the commit processor
1896                 if (hadCommit && gotNewCommit) {
1897                         return true;
1898                 }
1899         }
1900
1901         return false;
1902 }
1903 // bool compactArbitrationData() {
1904 //  return false;
1905 // }
1906
1907 /**
1908  * Update all the commits and the committed tables, sets dead the dead transactions
1909  */
1910 bool Table::updateCommittedTable() {
1911
1912         if (newCommitParts.size() == 0) {
1913                 // Nothing new to process
1914                 return false;
1915         }
1916
1917         // Iterate through all the machine Ids that we received new parts for
1918         for (Long machineId : newCommitParts.keySet()) {
1919                 Hashtable<Pair<int64_t int32_t>, CommitPart> parts = newCommitParts.get(machineId);
1920
1921                 // Iterate through all the parts for that machine Id
1922                 for (Pair<int64_t int32_t> partId : parts.keySet()) {
1923                         CommitPart part = parts.get(partId);
1924
1925                         // Get the transaction object for that sequence number
1926                         Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1927
1928                         if (commitForClientTable == NULL) {
1929                                 // This is the first commit from this device
1930                                 commitForClientTable = new Hashtable<int64_t Commit>();
1931                                 liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1932                         }
1933
1934                         Commit commit = commitForClientTable.get(part.getSequenceNumber());
1935
1936                         if (commit == NULL) {
1937                                 // This is a new commit that we dont have so make a new one
1938                                 commit = new Commit();
1939
1940                                 // Insert this new commit into the live tables
1941                                 commitForClientTable.put(part.getSequenceNumber(), commit);
1942                         }
1943
1944                         // Add that part to the commit
1945                         commit.addPartDecode(part);
1946                 }
1947         }
1948
1949         // Clear all the new commits parts in preparation for the next time the server sends slots
1950         newCommitParts.clear();
1951
1952         // If we process a new commit keep track of it for future use
1953         bool didProcessANewCommit = false;
1954
1955         // Process the commits one by one
1956         for (Long arbitratorId : liveCommitsTable.keySet()) {
1957
1958                 // Get all the commits for a specific arbitrator
1959                 Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
1960
1961                 // Sort the commits in order
1962                 Vector<Long> commitSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
1963                 Collections.sort(commitSequenceNumbers);
1964
1965                 // Get the last commit seen from this arbitrator
1966                 int64_t lastCommitSeenSequenceNumber = -1;
1967                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) {
1968                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
1969                 }
1970
1971                 // Go through each new commit one by one
1972                 for (int i = 0; i < commitSequenceNumbers.size(); i++) {
1973                         Long commitSequenceNumber = commitSequenceNumbers.get(i);
1974                         Commit commit = commitForClientTable.get(commitSequenceNumber);
1975
1976                         // Special processing if a commit is not complete
1977                         if (!commit.isComplete()) {
1978                                 if (i == (commitSequenceNumbers.size() - 1)) {
1979                                         // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1980                                         break;
1981                                 } else {
1982                                         // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
1983                                         // Delete it and move on
1984                                         commit.setDead();
1985                                         commitForClientTable.remove(commit.getSequenceNumber());
1986                                         continue;
1987                                 }
1988                         }
1989
1990                         // Update the last transaction that was updated if we can
1991                         if (commit.getTransactionSequenceNumber() != -1) {
1992                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1993
1994                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1995                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1996                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1997                                 }
1998                         }
1999
2000                         // Update the last arbitration data that we have seen so far
2001                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) {
2002
2003                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2004                                 if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2005                                         // Is larger
2006                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2007                                 }
2008                         } else {
2009                                 // Never seen any data from this arbitrator so record the first one
2010                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2011                         }
2012
2013                         // We have already seen this commit before so need to do the full processing on this commit
2014                         if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2015
2016                                 // Update the last transaction that was updated if we can
2017                                 if (commit.getTransactionSequenceNumber() != -1) {
2018                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2019
2020                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2021                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2022                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2023                                         }
2024                                 }
2025
2026                                 continue;
2027                         }
2028
2029                         // If we got here then this is a brand new commit and needs full processing
2030
2031                         // Get what commits should be edited, these are the commits that have live values for their keys
2032                         Set<Commit> commitsToEdit = new HashSet<Commit>();
2033                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2034                                 commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2035                         }
2036                         commitsToEdit.remove(NULL);             // remove NULL since it could be in this set
2037
2038                         // Update each previous commit that needs to be updated
2039                         for (Commit previousCommit : commitsToEdit) {
2040
2041                                 // Only bother with live commits (TODO: Maybe remove this check)
2042                                 if (previousCommit.isLive()) {
2043
2044                                         // Update which keys in the old commits are still live
2045                                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2046                                                 previousCommit.invalidateKey(kv.getKey());
2047                                         }
2048
2049                                         // if the commit is now dead then remove it
2050                                         if (!previousCommit.isLive()) {
2051                                                 commitForClientTable.remove(previousCommit);
2052                                         }
2053                                 }
2054                         }
2055
2056                         // Update the last seen sequence number from this arbitrator
2057                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) {
2058                                 if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2059                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2060                                 }
2061                         } else {
2062                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2063                         }
2064
2065                         // We processed a new commit that we havent seen before
2066                         didProcessANewCommit = true;
2067
2068                         // Update the committed table of keys and which commit is using which key
2069                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2070                                 committedKeyValueTable.put(kv.getKey(), kv);
2071                                 liveCommitsByKeyTable.put(kv.getKey(), commit);
2072                         }
2073                 }
2074         }
2075
2076         return didProcessANewCommit;
2077 }
2078
2079 /**
2080  * Create the speculative table from transactions that are still live and have come from the cloud
2081  */
2082 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2083         if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2084                 // There is nothing to speculate on
2085                 return false;
2086         }
2087
2088         // Create a list of the transaction sequence numbers and sort them from oldest to newest
2089         Vector<Long> transactionSequenceNumbersSorted = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
2090         Collections.sort(transactionSequenceNumbersSorted);
2091
2092         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2093
2094
2095         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2096                 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2097                 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2098
2099                 // Start from scratch
2100                 speculatedKeyValueTable.clear();
2101                 lastTransactionSequenceNumberSpeculatedOn = -1;
2102                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2103
2104         }
2105
2106         // Remember the front of the transaction list
2107         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2108
2109         // Find where to start arbitration from
2110         int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2111
2112         if (startIndex >= transactionSequenceNumbersSorted.size()) {
2113                 // Make sure we are not out of bounds
2114                 return false;           // did not speculate
2115         }
2116
2117         Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2118         bool didSkip = true;
2119
2120         for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2121                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2122                 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2123
2124                 if (!transaction.isComplete()) {
2125                         // If there is an incomplete transaction then there is nothing we can do
2126                         // add this transactions arbitrator to the list of arbitrators we should ignore
2127                         incompleteTransactionArbitrator.add(transaction.getArbitrator());
2128                         didSkip = true;
2129                         continue;
2130                 }
2131
2132                 if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2133                         continue;
2134                 }
2135
2136                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2137
2138                 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2139                         // Guard evaluated to true so update the speculative table
2140                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2141                                 speculatedKeyValueTable.put(kv.getKey(), kv);
2142                         }
2143                 }
2144         }
2145
2146         if (didSkip) {
2147                 // Since there was a skip we need to redo the speculation next time around
2148                 lastTransactionSequenceNumberSpeculatedOn = -1;
2149                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2150         }
2151
2152         // We did some speculation
2153         return true;
2154 }
2155
2156 /**
2157  * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2158  */
2159 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2160         if (pendingTransactionQueue.size() == 0) {
2161                 // There is nothing to speculate on
2162                 return;
2163         }
2164
2165
2166         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2167                 // need to reset on the pending speculation
2168                 lastPendingTransactionSpeculatedOn = NULL;
2169                 firstPendingTransaction = pendingTransactionQueue.get(0);
2170                 pendingTransactionSpeculatedKeyValueTable.clear();
2171         }
2172
2173         // Find where to start arbitration from
2174         int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2175
2176         if (startIndex >= pendingTransactionQueue.size()) {
2177                 // Make sure we are not out of bounds
2178                 return;
2179         }
2180
2181         for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2182                 Transaction transaction = pendingTransactionQueue.get(i);
2183
2184                 lastPendingTransactionSpeculatedOn = transaction;
2185
2186                 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2187                         // Guard evaluated to true so update the speculative table
2188                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2189                                 pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2190                         }
2191                 }
2192         }
2193 }
2194
2195 /**
2196  * Set dead and remove from the live transaction tables the transactions that are dead
2197  */
2198 void Table::updateLiveTransactionsAndStatus() {
2199
2200         // Go through each of the transactions
2201         for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2202                 Transaction transaction = iter.next().getValue();
2203
2204                 // Check if the transaction is dead
2205                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2206                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2207
2208                         // Set dead the transaction
2209                         transaction.setDead();
2210
2211                         // Remove the transaction from the live table
2212                         iter.remove();
2213                         liveTransactionByTransactionIdTable.remove(transaction.getId());
2214                 }
2215         }
2216
2217         // Go through each of the transactions
2218         for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2219                 TransactionStatus status = iter.next().getValue();
2220
2221                 // Check if the transaction is dead
2222                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2223                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2224
2225                         // Set committed
2226                         status.setStatus(TransactionStatus.StatusCommitted);
2227
2228                         // Remove
2229                         iter.remove();
2230                 }
2231         }
2232 }
2233
2234 /**
2235  * Process this slot, entry by entry.  Also update the latest message sent by slot
2236  */
2237 void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2238
2239         // Update the last message seen
2240         updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2241
2242         // Process each entry in the slot
2243         for (Entry entry : slot.getEntries()) {
2244                 switch (entry.getType()) {
2245
2246                 case Entry.TypeCommitPart:
2247                         processEntry((CommitPart)entry);
2248                         break;
2249
2250                 case Entry.TypeAbort:
2251                         processEntry((Abort)entry);
2252                         break;
2253
2254                 case Entry.TypeTransactionPart:
2255                         processEntry((TransactionPart)entry);
2256                         break;
2257
2258                 case Entry.TypeNewKey:
2259                         processEntry((NewKey)entry);
2260                         break;
2261
2262                 case Entry.TypeLastMessage:
2263                         processEntry((LastMessage)entry, machineSet);
2264                         break;
2265
2266                 case Entry.TypeRejectedMessage:
2267                         processEntry((RejectedMessage)entry, indexer);
2268                         break;
2269
2270                 case Entry.TypeTableStatus:
2271                         processEntry((TableStatus)entry, slot.getSequenceNumber());
2272                         break;
2273
2274                 default:
2275                         throw new Error("Unrecognized type: " + entry.getType());
2276                 }
2277         }
2278 }
2279
2280 /**
2281  * Update the last message that was sent for a machine Id
2282  */
2283 void Table::processEntry(LastMessage entry, HashSet<Long> machineSet) {
2284         // Update what the last message received by a machine was
2285         updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2286 }
2287
2288 /**
2289  * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2290  */
2291 void Table::processEntry(NewKey entry) {
2292
2293         // Update the arbitrator table with the new key information
2294         arbitratorTable.put(entry.getKey(), entry.getMachineID());
2295
2296         // Update what the latest live new key is
2297         NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2298         if (oldNewKey != NULL) {
2299                 // Delete the old new key messages
2300                 oldNewKey.setDead();
2301         }
2302 }
2303
2304 /**
2305  * Process new table status entries and set dead the old ones as new ones come in.
2306  * keeps track of the largest and smallest table status seen in this current round
2307  * of updating the local copy of the block chain
2308  */
2309 void Table::processEntry(TableStatus entry, int64_t seq) {
2310         int newNumSlots = entry.getMaxSlots();
2311         updateCurrMaxSize(newNumSlots);
2312
2313         initExpectedSize(seq, newNumSlots);
2314
2315         if (liveTableStatus != NULL) {
2316                 // We have a larger table status so the old table status is no int64_ter alive
2317                 liveTableStatus.setDead();
2318         }
2319
2320         // Make this new table status the latest alive table status
2321         liveTableStatus = entry;
2322 }
2323
2324 /**
2325  * Check old messages to see if there is a block chain violation. Also
2326  */
2327 void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
2328         int64_t oldSeqNum = entry.getOldSeqNum();
2329         int64_t newSeqNum = entry.getNewSeqNum();
2330         bool isequal = entry.getEqual();
2331         int64_t machineId = entry.getMachineID();
2332         int64_t seq = entry.getSequenceNumber();
2333
2334
2335         // Check if we have messages that were supposed to be rejected in our local block chain
2336         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2337
2338                 // Get the slot
2339                 Slot slot = indexer.getSlot(seqNum);
2340
2341                 if (slot != NULL) {
2342                         // If we have this slot make sure that it was not supposed to be a rejected slot
2343
2344                         int64_t slotMachineId = slot.getMachineID();
2345                         if (isequal != (slotMachineId == machineId)) {
2346                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2347                         }
2348                 }
2349         }
2350
2351
2352         // Create a list of clients to watch until they see this rejected message entry.
2353         HashSet<Long> deviceWatchSet = new HashSet<Long>();
2354         for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
2355
2356                 // Machine ID for the last message entry
2357                 int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
2358
2359                 // We've seen it, don't need to continue to watch.  Our next
2360                 // message will implicitly acknowledge it.
2361                 if (lastMessageEntryMachineId == localMachineId) {
2362                         continue;
2363                 }
2364
2365                 Pair<int64_t Liveness> lastMessageValue = lastMessageEntry.getValue();
2366                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2367
2368                 if (entrySequenceNumber < seq) {
2369
2370                         // Add this rejected message to the set of messages that this machine ID did not see yet
2371                         addWatchVector(lastMessageEntryMachineId, entry);
2372
2373                         // This client did not see this rejected message yet so add it to the watch set to monitor
2374                         deviceWatchSet.add(lastMessageEntryMachineId);
2375                 }
2376         }
2377
2378         if (deviceWatchSet.isEmpty()) {
2379                 // This rejected message has been seen by all the clients so
2380                 entry.setDead();
2381         } else {
2382                 // We need to watch this rejected message
2383                 entry.setWatchSet(deviceWatchSet);
2384         }
2385 }
2386
2387 /**
2388  * Check if this abort is live, if not then save it so we can kill it later.
2389  * update the last transaction number that was arbitrated on.
2390  */
2391 void Table::processEntry(Abort entry) {
2392
2393
2394         if (entry.getTransactionSequenceNumber() != -1) {
2395                 // update the transaction status if it was sent to the server
2396                 TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2397                 if (status != NULL) {
2398                         status.setStatus(TransactionStatus.StatusAborted);
2399                 }
2400         }
2401
2402         // Abort has not been seen by the client it is for yet so we need to keep track of it
2403         Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2404         if (previouslySeenAbort != NULL) {
2405                 previouslySeenAbort.setDead();  // Delete old version of the abort since we got a rescued newer version
2406         }
2407
2408         if (entry.getTransactionArbitrator() == localMachineId) {
2409                 liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2410         }
2411
2412         if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2413
2414                 // The machine already saw this so it is dead
2415                 entry.setDead();
2416                 liveAbortTable.remove(entry.getAbortId());
2417
2418                 if (entry.getTransactionArbitrator() == localMachineId) {
2419                         liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2420                 }
2421
2422                 return;
2423         }
2424
2425
2426
2427
2428         // Update the last arbitration data that we have seen so far
2429         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) {
2430
2431                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2432                 if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2433                         // Is larger
2434                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2435                 }
2436         } else {
2437                 // Never seen any data from this arbitrator so record the first one
2438                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2439         }
2440
2441
2442         // Set dead a transaction if we can
2443         Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<int64_t, int64_t>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2444         if (transactionToSetDead != NULL) {
2445                 liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2446         }
2447
2448         // Update the last transaction sequence number that the arbitrator arbitrated on
2449         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2450         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2451
2452                 // Is a valid one
2453                 if (entry.getTransactionSequenceNumber() != -1) {
2454                         lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2455                 }
2456         }
2457 }
2458
2459 /**
2460  * Set dead the transaction part if that transaction is dead and keep track of all new parts
2461  */
2462 void Table::processEntry(TransactionPart entry) {
2463         // Check if we have already seen this transaction and set it dead OR if it is not alive
2464         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2465         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2466                 // This transaction is dead, it was already committed or aborted
2467                 entry.setDead();
2468                 return;
2469         }
2470
2471         // This part is still alive
2472         Hashtable<Pair<int64_t int32_t>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2473
2474         if (transactionPart == NULL) {
2475                 // Dont have a table for this machine Id yet so make one
2476                 transactionPart = new Hashtable<Pair<int64_t int32_t>, TransactionPart>();
2477                 newTransactionParts.put(entry.getMachineId(), transactionPart);
2478         }
2479
2480         // Update the part and set dead ones we have already seen (got a rescued version)
2481         TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2482         if (previouslySeenPart != NULL) {
2483                 previouslySeenPart.setDead();
2484         }
2485 }
2486
2487 /**
2488  * Process new commit entries and save them for future use.  Delete duplicates
2489  */
2490 void Table::processEntry(CommitPart entry) {
2491
2492
2493         // Update the last transaction that was updated if we can
2494         if (entry.getTransactionSequenceNumber() != -1) {
2495                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2496
2497                 // Update the last transaction sequence number that the arbitrator arbitrated on
2498                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2499                         lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2500                 }
2501         }
2502
2503
2504
2505
2506         Hashtable<Pair<int64_t int32_t>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2507
2508         if (commitPart == NULL) {
2509                 // Don't have a table for this machine Id yet so make one
2510                 commitPart = new Hashtable<Pair<int64_t int32_t>, CommitPart>();
2511                 newCommitParts.put(entry.getMachineId(), commitPart);
2512         }
2513
2514         // Update the part and set dead ones we have already seen (got a rescued version)
2515         CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2516         if (previouslySeenPart != NULL) {
2517                 previouslySeenPart.setDead();
2518         }
2519 }
2520
2521 /**
2522  * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2523  * Updates the live aborts, removes those that are dead and sets them dead.
2524  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2525  * other clients have not had a rollback on the last message.
2526  */
2527 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2528
2529         // We have seen this machine ID
2530         machineSet.remove(machineId);
2531
2532         // Get the set of rejected messages that this machine Id is has not seen yet
2533         HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
2534
2535         // If there is a rejected message that this machine Id has not seen yet
2536         if (watchset != NULL) {
2537
2538                 // Go through each rejected message that this machine Id has not seen yet
2539                 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2540
2541                         RejectedMessage rm = rmit.next();
2542
2543                         // If this machine Id has seen this rejected message...
2544                         if (rm.getSequenceNumber() <= seqNum) {
2545
2546                                 // Remove it from our watchlist
2547                                 rmit.remove();
2548
2549                                 // Decrement machines that need to see this notification
2550                                 rm.removeWatcher(machineId);
2551                         }
2552                 }
2553         }
2554
2555         // Set dead the abort
2556         for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2557                 Abort abort = i.next().getValue();
2558
2559                 if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2560                         abort.setDead();
2561                         i.remove();
2562
2563                         if (abort.getTransactionArbitrator() == localMachineId) {
2564                                 liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2565                         }
2566                 }
2567         }
2568
2569
2570
2571         if (machineId == localMachineId) {
2572                 // Our own messages are immediately dead.
2573                 if (liveness instanceof LastMessage) {
2574                         ((LastMessage)liveness).setDead();
2575                 } else if (liveness instanceof Slot) {
2576                         ((Slot)liveness).setDead();
2577                 } else {
2578                         throw new Error("Unrecognized type");
2579                 }
2580         }
2581
2582         // Get the old last message for this device
2583         Pair<int64_t Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<int64_t Liveness>(seqNum, liveness));
2584         if (lastMessageEntry == NULL) {
2585                 // If no last message then there is nothing else to process
2586                 return;
2587         }
2588
2589         int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2590         Liveness lastEntry = lastMessageEntry.getSecond();
2591
2592         // If it is not our machine Id since we already set ours to dead
2593         if (machineId != localMachineId) {
2594                 if (lastEntry instanceof LastMessage) {
2595                         ((LastMessage)lastEntry).setDead();
2596                 } else if (lastEntry instanceof Slot) {
2597                         ((Slot)lastEntry).setDead();
2598                 } else {
2599                         throw new Error("Unrecognized type");
2600                 }
2601         }
2602
2603         // Make sure the server is not playing any games
2604         if (machineId == localMachineId) {
2605
2606                 if (hadPartialSendToServer) {
2607                         // We were not making any updates and we had a machine mismatch
2608                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2609                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2610                         }
2611
2612                 } else {
2613                         // We were not making any updates and we had a machine mismatch
2614                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2615                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2616                         }
2617                 }
2618         } else {
2619                 if (lastMessageSeqNum > seqNum) {
2620                         throw new Error("Server Error: Rollback on remote machine sequence number");
2621                 }
2622         }
2623 }
2624
2625 /**
2626  * Add a rejected message entry to the watch set to keep track of which clients have seen that
2627  * rejected message entry and which have not.
2628  */
2629 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
2630         HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
2631         if (entries == NULL) {
2632                 // There is no set for this machine ID yet so create one
2633                 entries = new HashSet<RejectedMessage>();
2634                 rejectedMessageWatchVectorTable.put(machineId, entries);
2635         }
2636         entries.add(entry);
2637 }
2638
2639 /**
2640  * Check if the HMAC chain is not violated
2641  */
2642 void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2643         for (int i = 0; i < newSlots.length; i++) {
2644                 Slot currSlot = newSlots[i];
2645                 Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2646                 if (prevSlot != NULL &&
2647                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2648                         throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2649         }
2650 }
2651