de0a1dafa95a15df4f784b17bd21a292a29ea1ca
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2
3 Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
4         localMachineId = _localMachineId;
5         cloud = new CloudComm(this, baseurl, password, listeningPort);
6
7         init();
8 }
9
10 Table::Table(CloudComm _cloud, int64_t _localMachineId) {
11         localMachineId = _localMachineId;
12         cloud = _cloud;
13
14         init();
15 }
16
17 /**
18  * Init all the stuff needed for for table usage
19  */
20 void Table::init() {
21
22         // Init helper objects
23         random = new Random();
24         buffer = new SlotBuffer();
25
26         // Set Variables
27         oldestLiveSlotSequenceNumver = 1;
28
29         // init data structs
30         committedKeyValueTable = new Hashtable<IoTString, KeyValue>();
31         speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
32         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
33         liveNewKeyTable = new Hashtable<IoTString, NewKey>();
34         lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
35         rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
36         arbitratorTable = new Hashtable<IoTString, Long>();
37         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
38         newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
39         newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
40         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
41         liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
42         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
43         liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
44         liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
45         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
46         rejectedSlotVector = new Vector<Long>();
47         pendingTransactionQueue = new Vector<Transaction>();
48         pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
49         transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
50         outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
51         liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
52         offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
53         localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
54         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
55         pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
56         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
57
58
59         // Other init stuff
60         numberOfSlots = buffer.capacity();
61         setResizeThreshold();
62 }
63
64 // TODO: delete method
65 synchronized void Table::printSlots() {
66         int64_t o = buffer.getOldestSeqNum();
67         int64_t n = buffer.getNewestSeqNum();
68
69         int[] types = new int[10];
70
71         int num = 0;
72
73         int livec = 0;
74         int deadc = 0;
75
76         int casdasd = 0;
77
78         int liveslo = 0;
79
80         for (int64_t i = o; i < (n + 1); i++) {
81                 Slot s = buffer.getSlot(i);
82
83
84                 if (s.isLive()) {
85                         liveslo++;
86                 }
87
88                 Vector<Entry> entries = s.getEntries();
89
90                 for (Entry e : entries) {
91                         if (e.isLive()) {
92                                 int type = e.getType();
93
94
95                                 if (type == 6) {
96                                         RejectedMessage rej = (RejectedMessage)e;
97                                         casdasd++;
98
99                                         System.out.println(rej.getMachineID());
100                                 }
101
102
103                                 types[type] = types[type] + 1;
104                                 num++;
105                                 livec++;
106                         } else {
107                                 deadc++;
108                         }
109                 }
110         }
111
112         for (int i = 0; i < 10; i++) {
113                 System.out.println(i + "    " + types[i]);
114         }
115         System.out.println("Live count:   " + livec);
116         System.out.println("Live Slot count:   " + liveslo);
117
118         System.out.println("Dead count:   " + deadc);
119         System.out.println("Old:   " + o);
120         System.out.println("New:   " + n);
121         System.out.println("Size:   " + buffer.size());
122         // System.out.println("Commits:   " + liveCommitsTable.size());
123         System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
124         System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
125
126         for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
127                 System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
128         }
129
130
131         for (Long a : liveCommitsTable.keySet()) {
132                 for (Long b : liveCommitsTable.get(a).keySet()) {
133                         for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
134                                 System.out.print(kv + " ");
135                         }
136                         System.out.print("|| ");
137                 }
138                 System.out.println();
139         }
140
141 }
142
143 /**
144  * Initialize the table by inserting a table status as the first entry into the table status
145  * also initialize the crypto stuff.
146  */
147 synchronized void Table::initTable() throws ServerException {
148         cloud.initSecurity();
149
150         // Create the first insertion into the block chain which is the table status
151         Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
152         localSequenceNumber++;
153         TableStatus status = new TableStatus(s, numberOfSlots);
154         s.addEntry(status);
155         Slot[] array = cloud.putSlot(s, numberOfSlots);
156
157         if (array == NULL) {
158                 array = new Slot[] {s};
159                 // update local block chain
160                 validateAndUpdate(array, true);
161         } else if (array.length == 1) {
162                 // in case we did push the slot BUT we failed to init it
163                 validateAndUpdate(array, true);
164         } else {
165                 throw new Error("Error on initialization");
166         }
167 }
168
169 /**
170  * Rebuild the table from scratch by pulling the latest block chain from the server.
171  */
172 synchronized void Table::rebuild() throws ServerException {
173         // Just pull the latest slots from the server
174         Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
175         validateAndUpdate(newslots, true);
176         sendToServer(NULL);
177         updateLiveTransactionsAndStatus();
178
179 }
180
181 // String toString() {
182 //  String retString = " Committed Table: \n";
183 //  retString += "---------------------------\n";
184 //  retString += commitedTable.toString();
185
186 //  retString += "\n\n";
187
188 //  retString += " Speculative Table: \n";
189 //  retString += "---------------------------\n";
190 //  retString += speculativeTable.toString();
191
192 //  return retString;
193 // }
194
195 synchronized void Table::addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
196         localCommunicationTable.put(arbitrator, new Pair<String, int32_t>(hostName, portNumber));
197 }
198
199 synchronized Long Table::getArbitrator(IoTString key) {
200         return arbitratorTable.get(key);
201 }
202
203 synchronized void Table::close() {
204         cloud.close();
205 }
206
207 synchronized IoTString Table::getCommitted(IoTString key)  {
208         KeyValue kv = committedKeyValueTable.get(key);
209
210         if (kv != NULL) {
211                 return kv.getValue();
212         } else {
213                 return NULL;
214         }
215 }
216
217 synchronized IoTString Table::getSpeculative(IoTString key) {
218         KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
219
220         if (kv == NULL) {
221                 kv = speculatedKeyValueTable.get(key);
222         }
223
224         if (kv == NULL) {
225                 kv = committedKeyValueTable.get(key);
226         }
227
228         if (kv != NULL) {
229                 return kv.getValue();
230         } else {
231                 return NULL;
232         }
233 }
234
235 synchronized IoTString Table::getCommittedAtomic(IoTString key) {
236         KeyValue kv = committedKeyValueTable.get(key);
237
238         if (arbitratorTable.get(key) == NULL) {
239                 throw new Error("Key not Found.");
240         }
241
242         // Make sure new key value pair matches the current arbitrator
243         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
244                 // TODO: Maybe not throw en error
245                 throw new Error("Not all Key Values Match Arbitrator.");
246         }
247
248         if (kv != NULL) {
249                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
250                 return kv.getValue();
251         } else {
252                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
253                 return NULL;
254         }
255 }
256
257 synchronized IoTString Table::getSpeculativeAtomic(IoTString key) {
258         if (arbitratorTable.get(key) == NULL) {
259                 throw new Error("Key not Found.");
260         }
261
262         // Make sure new key value pair matches the current arbitrator
263         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
264                 // TODO: Maybe not throw en error
265                 throw new Error("Not all Key Values Match Arbitrator.");
266         }
267
268         KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key);
269
270         if (kv == NULL) {
271                 kv = speculatedKeyValueTable.get(key);
272         }
273
274         if (kv == NULL) {
275                 kv = committedKeyValueTable.get(key);
276         }
277
278         if (kv != NULL) {
279                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue()));
280                 return kv.getValue();
281         } else {
282                 pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL));
283                 return NULL;
284         }
285 }
286
287 synchronized bool Table::update()  {
288         try {
289                 Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
290                 validateAndUpdate(newSlots, false);
291                 sendToServer(NULL);
292
293
294                 updateLiveTransactionsAndStatus();
295
296                 return true;
297         } catch (Exception e) {
298                 // e.printStackTrace();
299
300                 for (Long m : localCommunicationTable.keySet()) {
301                         updateFromLocal(m);
302                 }
303         }
304
305         return false;
306 }
307
308 synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) throws ServerException {
309         while (true) {
310                 if (arbitratorTable.get(keyName) != NULL) {
311                         // There is already an arbitrator
312                         return false;
313                 }
314
315                 NewKey newKey = new NewKey(NULL, keyName, machineId);
316
317                 if (sendToServer(newKey)) {
318                         // If successfully inserted
319                         return true;
320                 }
321         }
322 }
323
324 synchronized void Table::startTransaction() {
325         // Create a new transaction, invalidates any old pending transactions.
326         pendingTransactionBuilder = new PendingTransaction(localMachineId);
327 }
328
329 synchronized void Table::addKV(IoTString key, IoTString value) {
330
331         // Make sure it is a valid key
332         if (arbitratorTable.get(key) == NULL) {
333                 throw new Error("Key not Found.");
334         }
335
336         // Make sure new key value pair matches the current arbitrator
337         if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) {
338                 // TODO: Maybe not throw en error
339                 throw new Error("Not all Key Values Match Arbitrator.");
340         }
341
342         // Add the key value to this transaction
343         KeyValue kv = new KeyValue(key, value);
344         pendingTransactionBuilder.addKV(kv);
345 }
346
347 synchronized TransactionStatus Table::commitTransaction() {
348
349         if (pendingTransactionBuilder.getKVUpdates().size() == 0) {
350                 // transaction with no updates will have no effect on the system
351                 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
352         }
353
354         // Set the local transaction sequence number and increment
355         pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber);
356         localTransactionSequenceNumber++;
357
358         // Create the transaction status
359         TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator());
360
361         // Create the new transaction
362         Transaction newTransaction = pendingTransactionBuilder.createTransaction();
363         newTransaction.setTransactionStatus(transactionStatus);
364
365         if (pendingTransactionBuilder.getArbitrator() != localMachineId) {
366                 // Add it to the queue and invalidate the builder for safety
367                 pendingTransactionQueue.add(newTransaction);
368         } else {
369                 arbitrateOnLocalTransaction(newTransaction);
370                 updateLiveStateFromLocal();
371         }
372
373         pendingTransactionBuilder = new PendingTransaction(localMachineId);
374
375         try {
376                 sendToServer(NULL);
377         } catch (ServerException e) {
378
379                 Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
380                 for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
381                         Transaction transaction = iter.next();
382
383                         if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
384                                 // Already contacted this client so ignore all attempts to contact this client
385                                 // to preserve ordering for arbitrator
386                                 continue;
387                         }
388
389                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
390
391                         if (sendReturn.getFirst()) {
392                                 // Failed to contact over local
393                                 arbitratorTriedAndFailed.add(transaction.getArbitrator());
394                         } else {
395                                 // Successful contact or should not contact
396
397                                 if (sendReturn.getSecond()) {
398                                         // did arbitrate
399                                         iter.remove();
400                                 }
401                         }
402                 }
403         }
404
405         updateLiveStateFromLocal();
406
407         return transactionStatus;
408 }
409
410 /**
411  * Get the machine ID for this client
412  */
413 int64_t Table::getMachineId() {
414         return localMachineId;
415 }
416
417 /**
418  * Decrement the number of live slots that we currently have
419  */
420 void Table::decrementLiveCount() {
421         liveSlotCount--;
422 }
423
424 /**
425  * Recalculate the new resize threshold
426  */
427 void Table::setResizeThreshold() {
428         int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots);
429         bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
430 }
431
432 int64_t Table::getLocalSequenceNumber() {
433         return localSequenceNumber;
434 }
435
436
437 bool lastInsertedNewKey = false;
438
439 bool Table::sendToServer(NewKey newKey) throws ServerException {
440
441         bool fromRetry = false;
442
443         try {
444                 if (hadPartialSendToServer) {
445                         Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
446                         if (newSlots.length == 0) {
447                                 fromRetry = true;
448                                 ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
449
450                                 if (sendSlotsReturn.getFirst()) {
451                                         if (newKey != NULL) {
452                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
453                                                         newKey = NULL;
454                                                 }
455                                         }
456
457                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
458                                                 transaction.resetServerFailure();
459
460                                                 // Update which transactions parts still need to be sent
461                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
462
463                                                 // Add the transaction status to the outstanding list
464                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
465
466                                                 // Update the transaction status
467                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
468
469                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
470                                                 if (transaction.didSendAllParts()) {
471                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
472                                                         pendingTransactionQueue.remove(transaction);
473                                                 }
474                                         }
475                                 } else {
476
477                                         newSlots = sendSlotsReturn.getThird();
478
479                                         bool isInserted = false;
480                                         for (Slot s : newSlots) {
481                                                 if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
482                                                         isInserted = true;
483                                                         break;
484                                                 }
485                                         }
486
487                                         for (Slot s : newSlots) {
488                                                 if (isInserted) {
489                                                         break;
490                                                 }
491
492                                                 // Process each entry in the slot
493                                                 for (Entry entry : s.getEntries()) {
494
495                                                         if (entry.getType() == Entry.TypeLastMessage) {
496                                                                 LastMessage lastMessage = (LastMessage)entry;
497                                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
498                                                                         isInserted = true;
499                                                                         break;
500                                                                 }
501                                                         }
502                                                 }
503                                         }
504
505                                         if (isInserted) {
506                                                 if (newKey != NULL) {
507                                                         if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
508                                                                 newKey = NULL;
509                                                         }
510                                                 }
511
512                                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
513                                                         transaction.resetServerFailure();
514
515                                                         // Update which transactions parts still need to be sent
516                                                         transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
517
518                                                         // Add the transaction status to the outstanding list
519                                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
520
521                                                         // Update the transaction status
522                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
523
524                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
525                                                         if (transaction.didSendAllParts()) {
526                                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
527                                                                 pendingTransactionQueue.remove(transaction);
528                                                         } else {
529                                                                 transaction.resetServerFailure();
530                                                                 // Set the transaction sequence number back to nothing
531                                                                 if (!transaction.didSendAPartToServer()) {
532                                                                         transaction.setSequenceNumber(-1);
533                                                                 }
534                                                         }
535                                                 }
536                                         }
537                                 }
538
539                                 for (Transaction transaction : lastTransactionPartsSent.keySet()) {
540                                         transaction.resetServerFailure();
541                                         // Set the transaction sequence number back to nothing
542                                         if (!transaction.didSendAPartToServer()) {
543                                                 transaction.setSequenceNumber(-1);
544                                         }
545                                 }
546
547                                 if (sendSlotsReturn.getThird().length != 0) {
548                                         // insert into the local block chain
549                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
550                                 }
551                                 // continue;
552                         } else {
553                                 bool isInserted = false;
554                                 for (Slot s : newSlots) {
555                                         if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
556                                                 isInserted = true;
557                                                 break;
558                                         }
559                                 }
560
561                                 for (Slot s : newSlots) {
562                                         if (isInserted) {
563                                                 break;
564                                         }
565
566                                         // Process each entry in the slot
567                                         for (Entry entry : s.getEntries()) {
568
569                                                 if (entry.getType() == Entry.TypeLastMessage) {
570                                                         LastMessage lastMessage = (LastMessage)entry;
571                                                         if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
572                                                                 isInserted = true;
573                                                                 break;
574                                                         }
575                                                 }
576                                         }
577                                 }
578
579                                 if (isInserted) {
580                                         if (newKey != NULL) {
581                                                 if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
582                                                         newKey = NULL;
583                                                 }
584                                         }
585
586                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
587                                                 transaction.resetServerFailure();
588
589                                                 // Update which transactions parts still need to be sent
590                                                 transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
591
592                                                 // Add the transaction status to the outstanding list
593                                                 outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
594
595                                                 // Update the transaction status
596                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
597
598                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
599                                                 if (transaction.didSendAllParts()) {
600                                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
601                                                         pendingTransactionQueue.remove(transaction);
602                                                 } else {
603                                                         transaction.resetServerFailure();
604                                                         // Set the transaction sequence number back to nothing
605                                                         if (!transaction.didSendAPartToServer()) {
606                                                                 transaction.setSequenceNumber(-1);
607                                                         }
608                                                 }
609                                         }
610                                 } else {
611                                         for (Transaction transaction : lastTransactionPartsSent.keySet()) {
612                                                 transaction.resetServerFailure();
613                                                 // Set the transaction sequence number back to nothing
614                                                 if (!transaction.didSendAPartToServer()) {
615                                                         transaction.setSequenceNumber(-1);
616                                                 }
617                                         }
618                                 }
619
620                                 // insert into the local block chain
621                                 validateAndUpdate(newSlots, true);
622                         }
623                 }
624         } catch (ServerException e) {
625                 throw e;
626         }
627
628
629
630         try {
631                 // While we have stuff that needs inserting into the block chain
632                 while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) {
633
634                         fromRetry = false;
635
636                         if (hadPartialSendToServer) {
637                                 throw new Error("Should Be error free");
638                         }
639
640
641
642                         // If there is a new key with same name then end
643                         if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) {
644                                 return false;
645                         }
646
647                         // Create the slot
648                         Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
649                         localSequenceNumber++;
650
651                         // Try to fill the slot with data
652                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
653                         bool needsResize = fillSlotsReturn.getFirst();
654                         int newSize = fillSlotsReturn.getSecond();
655                         bool insertedNewKey = fillSlotsReturn.getThird();
656
657                         if (needsResize) {
658                                 // Reset which transaction to send
659                                 for (Transaction transaction : transactionPartsSent.keySet()) {
660                                         transaction.resetNextPartToSend();
661
662                                         // Set the transaction sequence number back to nothing
663                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
664                                                 transaction.setSequenceNumber(-1);
665                                         }
666                                 }
667
668                                 // Clear the sent data since we are trying again
669                                 pendingSendArbitrationEntriesToDelete.clear();
670                                 transactionPartsSent.clear();
671
672                                 // We needed a resize so try again
673                                 fillSlot(slot, true, newKey);
674                         }
675
676                         lastSlotAttemptedToSend = slot;
677                         lastIsNewKey = (newKey != NULL);
678                         lastInsertedNewKey = insertedNewKey;
679                         lastNewSize = newSize;
680                         lastNewKey = newKey;
681                         lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
682                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
683
684
685                         ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
686
687                         if (sendSlotsReturn.getFirst()) {
688
689                                 // Did insert into the block chain
690
691                                 if (insertedNewKey) {
692                                         // This slot was what was inserted not a previous slot
693
694                                         // New Key was successfully inserted into the block chain so dont want to insert it again
695                                         newKey = NULL;
696                                 }
697
698                                 // Remove the aborts and commit parts that were sent from the pending to send queue
699                                 for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
700                                         ArbitrationRound round = iter.next();
701                                         round.removeParts(pendingSendArbitrationEntriesToDelete);
702
703                                         if (round.isDoneSending()) {
704                                                 // Sent all the parts
705                                                 iter.remove();
706                                         }
707                                 }
708
709                                 for (Transaction transaction : transactionPartsSent.keySet()) {
710                                         transaction.resetServerFailure();
711
712                                         // Update which transactions parts still need to be sent
713                                         transaction.removeSentParts(transactionPartsSent.get(transaction));
714
715                                         // Add the transaction status to the outstanding list
716                                         outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
717
718                                         // Update the transaction status
719                                         transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
720
721                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
722                                         if (transaction.didSendAllParts()) {
723                                                 transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
724                                                 pendingTransactionQueue.remove(transaction);
725                                         }
726                                 }
727                         } else {
728
729                                 // if (!sendSlotsReturn.getSecond()) {
730                                 //  for (Transaction transaction : lastTransactionPartsSent.keySet()) {
731                                 //    transaction.resetServerFailure();
732                                 //  }
733                                 // } else {
734                                 //  for (Transaction transaction : lastTransactionPartsSent.keySet()) {
735                                 //    transaction.resetServerFailure();
736
737                                 //    // Update which transactions parts still need to be sent
738                                 //    transaction.removeSentParts(transactionPartsSent.get(transaction));
739
740                                 //    // Add the transaction status to the outstanding list
741                                 //    outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
742
743                                 //    // Update the transaction status
744                                 //    transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
745
746                                 //    // Check if all the transaction parts were successfully sent and if so then remove it from pending
747                                 //    if (transaction.didSendAllParts()) {
748                                 //      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
749                                 //      pendingTransactionQueue.remove(transaction);
750
751                                 //      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
752                                 //        System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
753                                 //      }
754                                 //    }
755                                 //  }
756                                 // }
757
758                                 // Reset which transaction to send
759                                 for (Transaction transaction : transactionPartsSent.keySet()) {
760                                         transaction.resetNextPartToSend();
761                                         // transaction.resetNextPartToSend();
762
763                                         // Set the transaction sequence number back to nothing
764                                         if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
765                                                 transaction.setSequenceNumber(-1);
766                                         }
767                                 }
768                         }
769
770                         // Clear the sent data in preparation for next send
771                         pendingSendArbitrationEntriesToDelete.clear();
772                         transactionPartsSent.clear();
773
774                         if (sendSlotsReturn.getThird().length != 0) {
775                                 // insert into the local block chain
776                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
777                         }
778                 }
779
780         } catch (ServerException e) {
781
782                 if (e.getType() != ServerException.TypeInputTimeout) {
783                         // e.printStackTrace();
784
785                         // Nothing was able to be sent to the server so just clear these data structures
786                         for (Transaction transaction : transactionPartsSent.keySet()) {
787                                 transaction.resetNextPartToSend();
788
789                                 // Set the transaction sequence number back to nothing
790                                 if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
791                                         transaction.setSequenceNumber(-1);
792                                 }
793                         }
794                 } else {
795                         // There was a partial send to the server
796                         hadPartialSendToServer = true;
797
798
799                         // if (!fromRetry) {
800                         //  lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
801                         //  lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
802                         // }
803
804                         // Nothing was able to be sent to the server so just clear these data structures
805                         for (Transaction transaction : transactionPartsSent.keySet()) {
806                                 transaction.resetNextPartToSend();
807                                 transaction.setServerFailure();
808                         }
809                 }
810
811                 pendingSendArbitrationEntriesToDelete.clear();
812                 transactionPartsSent.clear();
813
814                 throw e;
815         }
816
817         return newKey == NULL;
818 }
819
820 synchronized bool Table::updateFromLocal(int64_t machineId) {
821         Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(machineId);
822         if (localCommunicationInformation == NULL) {
823                 // Cant talk to that device locally so do nothing
824                 return false;
825         }
826
827         // Get the size of the send data
828         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
829
830         Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
831         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
832                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
833         }
834
835         Array<char> *sendData = new char[sendDataSize];
836         ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
837
838         // Encode the data
839         bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
840         bbEncode.putInt(0);
841
842         // Send by local
843         Array<char> *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
844         localSequenceNumber++;
845
846         if (returnData == NULL) {
847                 // Could not contact server
848                 return false;
849         }
850
851         // Decode the data
852         ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
853         int numberOfEntries = bbDecode.getInt();
854
855         for (int i = 0; i < numberOfEntries; i++) {
856                 char type = bbDecode.get();
857                 if (type == Entry.TypeAbort) {
858                         Abort abort = (Abort)Abort.decode(NULL, bbDecode);
859                         processEntry(abort);
860                 } else if (type == Entry.TypeCommitPart) {
861                         CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
862                         processEntry(commitPart);
863                 }
864         }
865
866         updateLiveStateFromLocal();
867
868         return true;
869 }
870
871 Pair<bool, bool> Table::sendTransactionToLocal(Transaction transaction) {
872
873         // Get the devices local communications
874         Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
875
876         if (localCommunicationInformation == NULL) {
877                 // Cant talk to that device locally so do nothing
878                 return new Pair<bool, bool>(true, false);
879         }
880
881         // Get the size of the send data
882         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
883         for (TransactionPart part : transaction.getParts().values()) {
884                 sendDataSize += part.getSize();
885         }
886
887         Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
888         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
889                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
890         }
891
892         // Make the send data size
893         Array<char> *sendData = new char[sendDataSize];
894         ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
895
896         // Encode the data
897         bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
898         bbEncode.putInt(transaction.getParts().size());
899         for (TransactionPart part : transaction.getParts().values()) {
900                 part.encode(bbEncode);
901         }
902
903
904         // Send by local
905         Array<char> *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
906         localSequenceNumber++;
907
908         if (returnData == NULL) {
909                 // Could not contact server
910                 return new Pair<bool, bool>(true, false);
911         }
912
913         // Decode the data
914         ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
915         bool didCommit = bbDecode.get() == 1;
916         bool couldArbitrate = bbDecode.get() == 1;
917         int numberOfEntries = bbDecode.getInt();
918         bool foundAbort = false;
919
920         for (int i = 0; i < numberOfEntries; i++) {
921                 char type = bbDecode.get();
922                 if (type == Entry.TypeAbort) {
923                         Abort abort = (Abort)Abort.decode(NULL, bbDecode);
924
925                         if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
926                                 foundAbort = true;
927                         }
928
929                         processEntry(abort);
930                 } else if (type == Entry.TypeCommitPart) {
931                         CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode);
932                         processEntry(commitPart);
933                 }
934         }
935
936         updateLiveStateFromLocal();
937
938         if (couldArbitrate) {
939                 TransactionStatus status =  transaction.getTransactionStatus();
940                 if (didCommit) {
941                         status.setStatus(TransactionStatus.StatusCommitted);
942                 } else {
943                         status.setStatus(TransactionStatus.StatusAborted);
944                 }
945         } else {
946                 TransactionStatus status =  transaction.getTransactionStatus();
947                 if (foundAbort) {
948                         status.setStatus(TransactionStatus.StatusAborted);
949                 } else {
950                         status.setStatus(TransactionStatus.StatusCommitted);
951                 }
952         }
953
954         return new Pair<bool, bool>(false, true);
955 }
956
957 synchronized Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
958
959         // Decode the data
960         ByteBuffer bbDecode = ByteBuffer.wrap(data);
961         int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong();
962         int numberOfParts = bbDecode.getInt();
963
964         // If we did commit a transaction or not
965         bool didCommit = false;
966         bool couldArbitrate = false;
967
968         if (numberOfParts != 0) {
969
970                 // decode the transaction
971                 Transaction transaction = new Transaction();
972                 for (int i = 0; i < numberOfParts; i++) {
973                         bbDecode.get();
974                         TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
975                         transaction.addPartDecode(newPart);
976                 }
977
978                 // Arbitrate on transaction and pull relevant return data
979                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
980                 couldArbitrate = localArbitrateReturn.getFirst();
981                 didCommit = localArbitrateReturn.getSecond();
982
983                 updateLiveStateFromLocal();
984
985                 // Transaction was sent to the server so keep track of it to prevent double commit
986                 if (transaction.getSequenceNumber() != -1) {
987                         offlineTransactionsCommittedAndAtServer.add(transaction.getId());
988                 }
989         }
990
991         // The data to send back
992         int returnDataSize = 0;
993         Vector<Entry> unseenArbitrations = new Vector<Entry>();
994
995         // Get the aborts to send back
996         Vector<Long> abortLocalSequenceNumbers = new Vector<Long >(liveAbortsGeneratedByLocal.keySet());
997         Collections.sort(abortLocalSequenceNumbers);
998         for (Long localSequenceNumber : abortLocalSequenceNumbers) {
999                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1000                         continue;
1001                 }
1002
1003                 Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
1004                 unseenArbitrations.add(abort);
1005                 returnDataSize += abort.getSize();
1006         }
1007
1008         // Get the commits to send back
1009         Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
1010         if (commitForClientTable != NULL) {
1011                 Vector<Long> commitLocalSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
1012                 Collections.sort(commitLocalSequenceNumbers);
1013
1014                 for (Long localSequenceNumber : commitLocalSequenceNumbers) {
1015                         Commit commit = commitForClientTable.get(localSequenceNumber);
1016
1017                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1018                                 continue;
1019                         }
1020
1021                         unseenArbitrations.addAll(commit.getParts().values());
1022
1023                         for (CommitPart commitPart : commit.getParts().values()) {
1024                                 returnDataSize += commitPart.getSize();
1025                         }
1026                 }
1027         }
1028
1029         // Number of arbitration entries to decode
1030         returnDataSize += 2 * sizeof(int32_t);
1031
1032         // bool of did commit or not
1033         if (numberOfParts != 0) {
1034                 returnDataSize += sizeof(char);
1035         }
1036
1037         // Data to send Back
1038         Array<char> *returnData = new char[returnDataSize];
1039         ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
1040
1041         if (numberOfParts != 0) {
1042                 if (didCommit) {
1043                         bbEncode.put((char)1);
1044                 } else {
1045                         bbEncode.put((char)0);
1046                 }
1047                 if (couldArbitrate) {
1048                         bbEncode.put((char)1);
1049                 } else {
1050                         bbEncode.put((char)0);
1051                 }
1052         }
1053
1054         bbEncode.putInt(unseenArbitrations.size());
1055         for (Entry entry : unseenArbitrations) {
1056                 entry.encode(bbEncode);
1057         }
1058
1059
1060         localSequenceNumber++;
1061         return returnData;
1062 }
1063
1064 ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey)  throws ServerException {
1065
1066         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1067         attemptedToSendToServer = true;
1068
1069         bool inserted = false;
1070         bool lastTryInserted = false;
1071
1072         Slot[] array = cloud.putSlot(slot, newSize);
1073         if (array == NULL) {
1074                 array = new Slot[] {slot};
1075                 rejectedSlotVector.clear();
1076                 inserted = true;
1077         } else {
1078                 if (array.length == 0) {
1079                         throw new Error("Server Error: Did not send any slots");
1080                 }
1081
1082                 // if (attemptedToSendToServerTmp) {
1083                 if (hadPartialSendToServer) {
1084
1085                         bool isInserted = false;
1086                         for (Slot s : array) {
1087                                 if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
1088                                         isInserted = true;
1089                                         break;
1090                                 }
1091                         }
1092
1093                         for (Slot s : array) {
1094                                 if (isInserted) {
1095                                         break;
1096                                 }
1097
1098                                 // Process each entry in the slot
1099                                 for (Entry entry : s.getEntries()) {
1100
1101                                         if (entry.getType() == Entry.TypeLastMessage) {
1102                                                 LastMessage lastMessage = (LastMessage)entry;
1103
1104                                                 if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
1105                                                         isInserted = true;
1106                                                         break;
1107                                                 }
1108                                         }
1109                                 }
1110                         }
1111
1112                         if (!isInserted) {
1113                                 rejectedSlotVector.add(slot.getSequenceNumber());
1114                                 lastTryInserted = false;
1115                         } else {
1116                                 lastTryInserted = true;
1117                         }
1118                 } else {
1119                         rejectedSlotVector.add(slot.getSequenceNumber());
1120                         lastTryInserted = false;
1121                 }
1122         }
1123
1124         return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
1125 }
1126
1127 /**
1128  * Returns false if a resize was needed
1129  */
1130 ThreeTuple<bool, int32_t, bool> *Table::fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
1131
1132
1133         int newSize = 0;
1134         if (liveSlotCount > bufferResizeThreshold) {
1135                 resize = true;  //Resize is forced
1136
1137         }
1138
1139         if (resize) {
1140                 newSize = (int) (numberOfSlots * RESIZE_MULTIPLE);
1141                 TableStatus status = new TableStatus(slot, newSize);
1142                 slot.addEntry(status);
1143         }
1144
1145         // Fill with rejected slots first before doing anything else
1146         doRejectedMessages(slot);
1147
1148         // Do mandatory rescue of entries
1149         ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1150
1151         // Extract working variables
1152         bool needsResize = mandatoryRescueReturn.getFirst();
1153         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1154         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1155
1156         if (needsResize && !resize) {
1157                 // We need to resize but we are not resizing so return false
1158                 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1159         }
1160
1161         bool inserted = false;
1162         if (newKeyEntry != NULL) {
1163                 newKeyEntry.setSlot(slot);
1164                 if (slot.hasSpace(newKeyEntry)) {
1165
1166                         slot.addEntry(newKeyEntry);
1167                         inserted = true;
1168                 }
1169         }
1170
1171         // Clear the transactions, aborts and commits that were sent previously
1172         transactionPartsSent.clear();
1173         pendingSendArbitrationEntriesToDelete.clear();
1174
1175         for (ArbitrationRound round : pendingSendArbitrationRounds) {
1176                 bool isFull = false;
1177                 round.generateParts();
1178                 Vector<Entry> parts = round.getParts();
1179
1180                 // Insert pending arbitration data
1181                 for (Entry arbitrationData : parts) {
1182
1183                         // If it is an abort then we need to set some information
1184                         if (arbitrationData instanceof Abort) {
1185                                 ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
1186                         }
1187
1188                         if (!slot.hasSpace(arbitrationData)) {
1189                                 // No space so cant do anything else with these data entries
1190                                 isFull = true;
1191                                 break;
1192                         }
1193
1194                         // Add to this current slot and add it to entries to delete
1195                         slot.addEntry(arbitrationData);
1196                         pendingSendArbitrationEntriesToDelete.add(arbitrationData);
1197                 }
1198
1199                 if (isFull) {
1200                         break;
1201                 }
1202         }
1203
1204         if (pendingTransactionQueue.size() > 0) {
1205
1206                 Transaction transaction = pendingTransactionQueue.get(0);
1207
1208                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1209                 // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
1210                 //  transaction.setSequenceNumber(slot.getSequenceNumber());
1211                 // }
1212
1213                 if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
1214                         transaction.setSequenceNumber(slot.getSequenceNumber());
1215                 }
1216
1217
1218                 while (true) {
1219                         TransactionPart part = transaction.getNextPartToSend();
1220
1221                         if (part == NULL) {
1222                                 // Ran out of parts to send for this transaction so move on
1223                                 break;
1224                         }
1225
1226                         if (slot.hasSpace(part)) {
1227                                 slot.addEntry(part);
1228                                 Vector<int32_t> partsSent = transactionPartsSent.get(transaction);
1229                                 if (partsSent == NULL) {
1230                                         partsSent = new Vector<int32_t>();
1231                                         transactionPartsSent.put(transaction, partsSent);
1232                                 }
1233                                 partsSent.add(part.getPartNumber());
1234                                 transactionPartsSent.put(transaction, partsSent);
1235                         } else {
1236                                 break;
1237                         }
1238                 }
1239         }
1240
1241         // Fill the remainder of the slot with rescue data
1242         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1243
1244         return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1245 }
1246
1247 void Table::doRejectedMessages(Slot s) {
1248         if (!rejectedSlotVector.isEmpty()) {
1249                 /* TODO: We should avoid generating a rejected message entry if
1250                  * there is already a sufficient entry in the queue (e.g.,
1251                  * equalsto value of true and same sequence number).  */
1252
1253                 int64_t old_seqn = rejectedSlotVector.firstElement();
1254                 if (rejectedSlotVector.size() > REJECTED_THRESHOLD) {
1255                         int64_t new_seqn = rejectedSlotVector.lastElement();
1256                         RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1257                         s.addEntry(rm);
1258                 } else {
1259                         int64_t prev_seqn = -1;
1260                         int i = 0;
1261                         /* Go through list of missing messages */
1262                         for (; i < rejectedSlotVector.size(); i++) {
1263                                 int64_t curr_seqn = rejectedSlotVector.get(i);
1264                                 Slot s_msg = buffer.getSlot(curr_seqn);
1265                                 if (s_msg != NULL)
1266                                         break;
1267                                 prev_seqn = curr_seqn;
1268                         }
1269                         /* Generate rejected message entry for missing messages */
1270                         if (prev_seqn != -1) {
1271                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1272                                 s.addEntry(rm);
1273                         }
1274                         /* Generate rejected message entries for present messages */
1275                         for (; i < rejectedSlotVector.size(); i++) {
1276                                 int64_t curr_seqn = rejectedSlotVector.get(i);
1277                                 Slot s_msg = buffer.getSlot(curr_seqn);
1278                                 int64_t machineid = s_msg.getMachineID();
1279                                 RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1280                                 s.addEntry(rm);
1281                         }
1282                 }
1283         }
1284 }
1285
1286 ThreeTuple<bool, bool, Long> Table::doMandatoryResuce(Slot slot, bool resize) {
1287         int64_t newestSequenceNumber = buffer.getNewestSeqNum();
1288         int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
1289         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1290                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1291         }
1292
1293         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1294         bool seenLiveSlot = false;
1295         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1296         int64_t threshold = firstIfFull + FREE_SLOTS;           // we want the buffer to be clear of live entries up to this point
1297
1298
1299         // Mandatory Rescue
1300         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1301                 Slot previousSlot = buffer.getSlot(currentSequenceNumber);
1302                 // Push slot number forward
1303                 if (!seenLiveSlot) {
1304                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1305                 }
1306
1307                 if (!previousSlot.isLive()) {
1308                         continue;
1309                 }
1310
1311                 // We have seen a live slot
1312                 seenLiveSlot = true;
1313
1314                 // Get all the live entries for a slot
1315                 Vector<Entry> liveEntries = previousSlot.getLiveEntries(resize);
1316
1317                 // Iterate over all the live entries and try to rescue them
1318                 for (Entry liveEntry : liveEntries) {
1319                         if (slot.hasSpace(liveEntry)) {
1320
1321                                 // Enough space to rescue the entry
1322                                 slot.addEntry(liveEntry);
1323                         } else if (currentSequenceNumber == firstIfFull) {
1324                                 //if there's no space but the entry is about to fall off the queue
1325                                 System.out.println("B");        //?
1326                                 return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
1327
1328                         }
1329                 }
1330         }
1331
1332         // Did not resize
1333         return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
1334 }
1335
1336 void Table::doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
1337         /* now go through live entries from least to greatest sequence number until
1338          * either all live slots added, or the slot doesn't have enough room
1339          * for SKIP_THRESHOLD consecutive entries*/
1340         int skipcount = 0;
1341         int64_t newestseqnum = buffer.getNewestSeqNum();
1342 search:
1343         for (; seqn <= newestseqnum; seqn++) {
1344                 Slot prevslot = buffer.getSlot(seqn);
1345                 //Push slot number forward
1346                 if (!seenliveslot)
1347                         oldestLiveSlotSequenceNumver = seqn;
1348
1349                 if (!prevslot.isLive())
1350                         continue;
1351                 seenliveslot = true;
1352                 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1353                 for (Entry liveentry : liveentries) {
1354                         if (s.hasSpace(liveentry))
1355                                 s.addEntry(liveentry);
1356                         else {
1357                                 skipcount++;
1358                                 if (skipcount > SKIP_THRESHOLD)
1359                                         break search;
1360                         }
1361                 }
1362         }
1363 }
1364
1365 /**
1366  * Checks for malicious activity and updates the local copy of the block chain.
1367  */
1368 void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
1369
1370         // The cloud communication layer has checked slot HMACs already before decoding
1371         if (newSlots.length == 0) {
1372                 return;
1373         }
1374
1375         // Make sure all slots are newer than the last largest slot this client has seen
1376         int64_t firstSeqNum = newSlots[0].getSequenceNumber();
1377         if (firstSeqNum <= sequenceNumber) {
1378                 throw new Error("Server Error: Sent older slots!");
1379         }
1380
1381         // Create an object that can access both new slots and slots in our local chain
1382         // without committing slots to our local chain
1383         SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1384
1385         // Check that the HMAC chain is not broken
1386         checkHMACChain(indexer, newSlots);
1387
1388         // Set to keep track of messages from clients
1389         HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
1390
1391         // Process each slots data
1392         for (Slot slot : newSlots) {
1393                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1394
1395                 updateExpectedSize();
1396         }
1397
1398         // If there is a gap, check to see if the server sent us everything.
1399         if (firstSeqNum != (sequenceNumber + 1)) {
1400
1401                 // Check the size of the slots that were sent down by the server.
1402                 // Can only check the size if there was a gap
1403                 checkNumSlots(newSlots.length);
1404
1405                 // Since there was a gap every machine must have pushed a slot or must have
1406                 // a last message message.  If not then the server is hiding slots
1407                 if (!machineSet.isEmpty()) {
1408                         throw new Error("Missing record for machines: " + machineSet);
1409                 }
1410         }
1411
1412         // Update the size of our local block chain.
1413         commitNewMaxSize();
1414
1415         // Commit new to slots to the local block chain.
1416         for (Slot slot : newSlots) {
1417
1418                 // Insert this slot into our local block chain copy.
1419                 buffer.putSlot(slot);
1420
1421                 // Keep track of how many slots are currently live (have live data in them).
1422                 liveSlotCount++;
1423         }
1424
1425         // Get the sequence number of the latest slot in the system
1426         sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
1427
1428         updateLiveStateFromServer();
1429
1430         // No Need to remember after we pulled from the server
1431         offlineTransactionsCommittedAndAtServer.clear();
1432
1433         // This is invalidated now
1434         hadPartialSendToServer = false;
1435 }
1436
1437 void Table::updateLiveStateFromServer() {
1438         // Process the new transaction parts
1439         processNewTransactionParts();
1440
1441         // Do arbitration on new transactions that were received
1442         arbitrateFromServer();
1443
1444         // Update all the committed keys
1445         bool didCommitOrSpeculate = updateCommittedTable();
1446
1447         // Delete the transactions that are now dead
1448         updateLiveTransactionsAndStatus();
1449
1450         // Do speculations
1451         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1452         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1453 }
1454
1455 void Table::updateLiveStateFromLocal() {
1456         // Update all the committed keys
1457         bool didCommitOrSpeculate = updateCommittedTable();
1458
1459         // Delete the transactions that are now dead
1460         updateLiveTransactionsAndStatus();
1461
1462         // Do speculations
1463         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1464         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1465 }
1466
1467 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1468         // if (didFindTableStatus) {
1469         // return;
1470         // }
1471         int64_t prevslots = firstSequenceNumber;
1472
1473
1474         if (didFindTableStatus) {
1475                 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1476                 // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1477
1478         } else {
1479                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1480                 // System.out.println("Here: " + expectedsize);
1481         }
1482
1483         // System.out.println(numberOfSlots);
1484
1485         didFindTableStatus = true;
1486         currMaxSize = numberOfSlots;
1487 }
1488
1489 void Table::updateExpectedSize() {
1490         expectedsize++;
1491
1492         if (expectedsize > currMaxSize) {
1493                 expectedsize = currMaxSize;
1494         }
1495 }
1496
1497
1498 /**
1499  * Check the size of the block chain to make sure there are enough slots sent back by the server.
1500  * This is only called when we have a gap between the slots that we have locally and the slots
1501  * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1502  * status message
1503  */
1504 void Table::checkNumSlots(int numberOfSlots) {
1505         if (numberOfSlots != expectedsize) {
1506                 throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
1507         }
1508 }
1509
1510 void Table::updateCurrMaxSize(int newmaxsize) {
1511         currMaxSize = newmaxsize;
1512 }
1513
1514
1515 /**
1516  * Update the size of of the local buffer if it is needed.
1517  */
1518 void Table::commitNewMaxSize() {
1519         didFindTableStatus = false;
1520
1521         // Resize the local slot buffer
1522         if (numberOfSlots != currMaxSize) {
1523                 buffer.resize((int)currMaxSize);
1524         }
1525
1526         // Change the number of local slots to the new size
1527         numberOfSlots = (int)currMaxSize;
1528
1529
1530         // Recalculate the resize threshold since the size of the local buffer has changed
1531         setResizeThreshold();
1532 }
1533
1534 /**
1535  * Process the new transaction parts from this latest round of slots received from the server
1536  */
1537 void Table::processNewTransactionParts() {
1538
1539         if (newTransactionParts.size() == 0) {
1540                 // Nothing new to process
1541                 return;
1542         }
1543
1544         // Iterate through all the machine Ids that we received new parts for
1545         for (Long machineId : newTransactionParts.keySet()) {
1546                 Hashtable<Pair<int64_t int32_t>, TransactionPart> parts = newTransactionParts.get(machineId);
1547
1548                 // Iterate through all the parts for that machine Id
1549                 for (Pair<int64_t int32_t> partId : parts.keySet()) {
1550                         TransactionPart part = parts.get(partId);
1551
1552                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId());
1553                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) {
1554                                 // Set dead the transaction part
1555                                 part.setDead();
1556                                 continue;
1557                         }
1558
1559                         // Get the transaction object for that sequence number
1560                         Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber());
1561
1562                         if (transaction == NULL) {
1563                                 // This is a new transaction that we dont have so make a new one
1564                                 transaction = new Transaction();
1565
1566                                 // Insert this new transaction into the live tables
1567                                 liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction);
1568                                 liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction);
1569                         }
1570
1571                         // Add that part to the transaction
1572                         transaction.addPartDecode(part);
1573                 }
1574         }
1575
1576         // Clear all the new transaction parts in preparation for the next time the server sends slots
1577         newTransactionParts.clear();
1578 }
1579
1580
1581 int64_t lastSeqNumArbOn = 0;
1582
1583 void Table::arbitrateFromServer() {
1584
1585         if (liveTransactionBySequenceNumberTable.size() == 0) {
1586                 // Nothing to arbitrate on so move on
1587                 return;
1588         }
1589
1590         // Get the transaction sequence numbers and sort from oldest to newest
1591         Vector<Long> transactionSequenceNumbers = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
1592         Collections.sort(transactionSequenceNumbers);
1593
1594         // Collection of key value pairs that are
1595         Hashtable<IoTString, KeyValue> speculativeTableTmp = new Hashtable<IoTString, KeyValue>();
1596
1597         // The last transaction arbitrated on
1598         int64_t lastTransactionCommitted = -1;
1599         Set<Abort> generatedAborts = new HashSet<Abort>();
1600
1601         for (Long transactionSequenceNumber : transactionSequenceNumbers) {
1602                 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
1603
1604
1605
1606                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1607                 if (transaction.getArbitrator() != localMachineId) {
1608                         continue;
1609                 }
1610
1611                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1612                         continue;
1613                 }
1614
1615                 if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
1616                         // We have seen this already locally so dont commit again
1617                         continue;
1618                 }
1619
1620
1621                 if (!transaction.isComplete()) {
1622                         // Will arbitrate in incorrect order if we continue so just break
1623                         // Most likely this
1624                         break;
1625                 }
1626
1627
1628                 // update the largest transaction seen by arbitrator from server
1629                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) {
1630                         lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1631                 } else {
1632                         Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
1633                         if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1634                                 lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
1635                         }
1636                 }
1637
1638                 if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1639                         // Guard evaluated as true
1640
1641                         // Update the local changes so we can make the commit
1642                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1643                                 speculativeTableTmp.put(kv.getKey(), kv);
1644                         }
1645
1646                         // Update what the last transaction committed was for use in batch commit
1647                         lastTransactionCommitted = transactionSequenceNumber;
1648                 } else {
1649                         // Guard evaluated was false so create abort
1650
1651                         // Create the abort
1652                         Abort newAbort = new Abort(NULL,
1653                                                                                                                                  transaction.getClientLocalSequenceNumber(),
1654                                                                                                                                  transaction.getSequenceNumber(),
1655                                                                                                                                  transaction.getMachineId(),
1656                                                                                                                                  transaction.getArbitrator(),
1657                                                                                                                                  localArbitrationSequenceNumber);
1658                         localArbitrationSequenceNumber++;
1659
1660                         generatedAborts.add(newAbort);
1661
1662                         // Insert the abort so we can process
1663                         processEntry(newAbort);
1664                 }
1665
1666                 lastSeqNumArbOn = transactionSequenceNumber;
1667
1668                 // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
1669         }
1670
1671         Commit newCommit = NULL;
1672
1673         // If there is something to commit
1674         if (speculativeTableTmp.size() != 0) {
1675
1676                 // Create the commit and increment the commit sequence number
1677                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1678                 localArbitrationSequenceNumber++;
1679
1680                 // Add all the new keys to the commit
1681                 for (KeyValue kv : speculativeTableTmp.values()) {
1682                         newCommit.addKV(kv);
1683                 }
1684
1685                 // create the commit parts
1686                 newCommit.createCommitParts();
1687
1688                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1689
1690                 // Insert the commit so we can process it
1691                 for (CommitPart commitPart : newCommit.getParts().values()) {
1692                         processEntry(commitPart);
1693                 }
1694         }
1695
1696         if ((newCommit != NULL) || (generatedAborts.size() > 0)) {
1697                 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1698                 pendingSendArbitrationRounds.add(arbitrationRound);
1699
1700                 if (compactArbitrationData()) {
1701                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1702                         if (newArbitrationRound.getCommit() != NULL) {
1703                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1704                                         processEntry(commitPart);
1705                                 }
1706                         }
1707                 }
1708         }
1709 }
1710
1711 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
1712
1713         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1714         if (transaction.getArbitrator() != localMachineId) {
1715                 return new Pair<bool, bool>(false, false);
1716         }
1717
1718         if (!transaction.isComplete()) {
1719                 // Will arbitrate in incorrect order if we continue so just break
1720                 // Most likely this
1721                 return new Pair<bool, bool>(false, false);
1722         }
1723
1724         if (transaction.getMachineId() != localMachineId) {
1725                 // dont do this check for local transactions
1726                 if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
1727                         if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
1728                                 // We've have already seen this from the server
1729                                 return new Pair<bool, bool>(false, false);
1730                         }
1731                 }
1732         }
1733
1734         if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1735                 // Guard evaluated as true
1736
1737                 // Create the commit and increment the commit sequence number
1738                 Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1739                 localArbitrationSequenceNumber++;
1740
1741                 // Update the local changes so we can make the commit
1742                 for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
1743                         newCommit.addKV(kv);
1744                 }
1745
1746                 // create the commit parts
1747                 newCommit.createCommitParts();
1748
1749                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1750                 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
1751                 pendingSendArbitrationRounds.add(arbitrationRound);
1752
1753                 if (compactArbitrationData()) {
1754                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1755                         for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1756                                 processEntry(commitPart);
1757                         }
1758                 } else {
1759                         // Insert the commit so we can process it
1760                         for (CommitPart commitPart : newCommit.getParts().values()) {
1761                                 processEntry(commitPart);
1762                         }
1763                 }
1764
1765                 if (transaction.getMachineId() == localMachineId) {
1766                         TransactionStatus status = transaction.getTransactionStatus();
1767                         if (status != NULL) {
1768                                 status.setStatus(TransactionStatus.StatusCommitted);
1769                         }
1770                 }
1771
1772                 updateLiveStateFromLocal();
1773                 return new Pair<bool, bool>(true, true);
1774         } else {
1775
1776                 if (transaction.getMachineId() == localMachineId) {
1777                         // For locally created messages update the status
1778
1779                         // Guard evaluated was false so create abort
1780                         TransactionStatus status = transaction.getTransactionStatus();
1781                         if (status != NULL) {
1782                                 status.setStatus(TransactionStatus.StatusAborted);
1783                         }
1784                 } else {
1785                         Set addAbortSet = new HashSet<Abort>();
1786
1787
1788                         // Create the abort
1789                         Abort newAbort = new Abort(NULL,
1790                                                                                                                                  transaction.getClientLocalSequenceNumber(),
1791                                                                                                                                  -1,
1792                                                                                                                                  transaction.getMachineId(),
1793                                                                                                                                  transaction.getArbitrator(),
1794                                                                                                                                  localArbitrationSequenceNumber);
1795                         localArbitrationSequenceNumber++;
1796
1797                         addAbortSet.add(newAbort);
1798
1799
1800                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1801                         ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1802                         pendingSendArbitrationRounds.add(arbitrationRound);
1803
1804                         if (compactArbitrationData()) {
1805                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1806                                 for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
1807                                         processEntry(commitPart);
1808                                 }
1809                         }
1810                 }
1811
1812                 updateLiveStateFromLocal();
1813                 return new Pair<bool, bool>(true, false);
1814         }
1815 }
1816
1817 /**
1818  * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1819  */
1820 bool Table::compactArbitrationData() {
1821
1822         if (pendingSendArbitrationRounds.size() < 2) {
1823                 // Nothing to compact so do nothing
1824                 return false;
1825         }
1826
1827         ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
1828         if (lastRound.didSendPart()) {
1829                 return false;
1830         }
1831
1832         bool hadCommit = (lastRound.getCommit() == NULL);
1833         bool gotNewCommit = false;
1834
1835         int numberToDelete = 1;
1836         while (numberToDelete < pendingSendArbitrationRounds.size()) {
1837                 ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
1838
1839                 if (round.isFull() || round.didSendPart()) {
1840                         // Stop since there is a part that cannot be compacted and we need to compact in order
1841                         break;
1842                 }
1843
1844                 if (round.getCommit() == NULL) {
1845
1846                         // Try compacting aborts only
1847                         int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
1848                         if (newSize > ArbitrationRound.MAX_PARTS) {
1849                                 // Cant compact since it would be too large
1850                                 break;
1851                         }
1852                         lastRound.addAborts(round.getAborts());
1853                 } else {
1854
1855                         // Create a new larger commit
1856                         Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
1857                         localArbitrationSequenceNumber++;
1858
1859                         // Create the commit parts so that we can count them
1860                         newCommit.createCommitParts();
1861
1862                         // Calculate the new size of the parts
1863                         int newSize = newCommit.getNumberOfParts();
1864                         newSize += lastRound.getAbortsCount();
1865                         newSize += round.getAbortsCount();
1866
1867                         if (newSize > ArbitrationRound.MAX_PARTS) {
1868                                 // Cant compact since it would be too large
1869                                 break;
1870                         }
1871
1872                         // Set the new compacted part
1873                         lastRound.setCommit(newCommit);
1874                         lastRound.addAborts(round.getAborts());
1875                         gotNewCommit = true;
1876                 }
1877
1878                 numberToDelete++;
1879         }
1880
1881         if (numberToDelete != 1) {
1882                 // If there is a compaction
1883
1884                 // Delete the previous pieces that are now in the new compacted piece
1885                 if (numberToDelete == pendingSendArbitrationRounds.size()) {
1886                         pendingSendArbitrationRounds.clear();
1887                 } else {
1888                         for (int i = 0; i < numberToDelete; i++) {
1889                                 pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
1890                         }
1891                 }
1892
1893                 // Add the new compacted into the pending to send list
1894                 pendingSendArbitrationRounds.add(lastRound);
1895
1896                 // Should reinsert into the commit processor
1897                 if (hadCommit && gotNewCommit) {
1898                         return true;
1899                 }
1900         }
1901
1902         return false;
1903 }
1904 // bool compactArbitrationData() {
1905 //  return false;
1906 // }
1907
1908 /**
1909  * Update all the commits and the committed tables, sets dead the dead transactions
1910  */
1911 bool Table::updateCommittedTable() {
1912
1913         if (newCommitParts.size() == 0) {
1914                 // Nothing new to process
1915                 return false;
1916         }
1917
1918         // Iterate through all the machine Ids that we received new parts for
1919         for (Long machineId : newCommitParts.keySet()) {
1920                 Hashtable<Pair<int64_t int32_t>, CommitPart> parts = newCommitParts.get(machineId);
1921
1922                 // Iterate through all the parts for that machine Id
1923                 for (Pair<int64_t int32_t> partId : parts.keySet()) {
1924                         CommitPart part = parts.get(partId);
1925
1926                         // Get the transaction object for that sequence number
1927                         Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(part.getMachineId());
1928
1929                         if (commitForClientTable == NULL) {
1930                                 // This is the first commit from this device
1931                                 commitForClientTable = new Hashtable<int64_t Commit>();
1932                                 liveCommitsTable.put(part.getMachineId(), commitForClientTable);
1933                         }
1934
1935                         Commit commit = commitForClientTable.get(part.getSequenceNumber());
1936
1937                         if (commit == NULL) {
1938                                 // This is a new commit that we dont have so make a new one
1939                                 commit = new Commit();
1940
1941                                 // Insert this new commit into the live tables
1942                                 commitForClientTable.put(part.getSequenceNumber(), commit);
1943                         }
1944
1945                         // Add that part to the commit
1946                         commit.addPartDecode(part);
1947                 }
1948         }
1949
1950         // Clear all the new commits parts in preparation for the next time the server sends slots
1951         newCommitParts.clear();
1952
1953         // If we process a new commit keep track of it for future use
1954         bool didProcessANewCommit = false;
1955
1956         // Process the commits one by one
1957         for (Long arbitratorId : liveCommitsTable.keySet()) {
1958
1959                 // Get all the commits for a specific arbitrator
1960                 Hashtable<int64_t Commit> commitForClientTable = liveCommitsTable.get(arbitratorId);
1961
1962                 // Sort the commits in order
1963                 Vector<Long> commitSequenceNumbers = new Vector<Long>(commitForClientTable.keySet());
1964                 Collections.sort(commitSequenceNumbers);
1965
1966                 // Get the last commit seen from this arbitrator
1967                 int64_t lastCommitSeenSequenceNumber = -1;
1968                 if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) {
1969                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
1970                 }
1971
1972                 // Go through each new commit one by one
1973                 for (int i = 0; i < commitSequenceNumbers.size(); i++) {
1974                         Long commitSequenceNumber = commitSequenceNumbers.get(i);
1975                         Commit commit = commitForClientTable.get(commitSequenceNumber);
1976
1977                         // Special processing if a commit is not complete
1978                         if (!commit.isComplete()) {
1979                                 if (i == (commitSequenceNumbers.size() - 1)) {
1980                                         // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1981                                         break;
1982                                 } else {
1983                                         // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet).
1984                                         // Delete it and move on
1985                                         commit.setDead();
1986                                         commitForClientTable.remove(commit.getSequenceNumber());
1987                                         continue;
1988                                 }
1989                         }
1990
1991                         // Update the last transaction that was updated if we can
1992                         if (commit.getTransactionSequenceNumber() != -1) {
1993                                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
1994
1995                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1996                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
1997                                         lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
1998                                 }
1999                         }
2000
2001                         // Update the last arbitration data that we have seen so far
2002                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) {
2003
2004                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
2005                                 if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
2006                                         // Is larger
2007                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2008                                 }
2009                         } else {
2010                                 // Never seen any data from this arbitrator so record the first one
2011                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
2012                         }
2013
2014                         // We have already seen this commit before so need to do the full processing on this commit
2015                         if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2016
2017                                 // Update the last transaction that was updated if we can
2018                                 if (commit.getTransactionSequenceNumber() != -1) {
2019                                         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
2020
2021                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2022                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
2023                                                 lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
2024                                         }
2025                                 }
2026
2027                                 continue;
2028                         }
2029
2030                         // If we got here then this is a brand new commit and needs full processing
2031
2032                         // Get what commits should be edited, these are the commits that have live values for their keys
2033                         Set<Commit> commitsToEdit = new HashSet<Commit>();
2034                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2035                                 commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
2036                         }
2037                         commitsToEdit.remove(NULL);             // remove NULL since it could be in this set
2038
2039                         // Update each previous commit that needs to be updated
2040                         for (Commit previousCommit : commitsToEdit) {
2041
2042                                 // Only bother with live commits (TODO: Maybe remove this check)
2043                                 if (previousCommit.isLive()) {
2044
2045                                         // Update which keys in the old commits are still live
2046                                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2047                                                 previousCommit.invalidateKey(kv.getKey());
2048                                         }
2049
2050                                         // if the commit is now dead then remove it
2051                                         if (!previousCommit.isLive()) {
2052                                                 commitForClientTable.remove(previousCommit);
2053                                         }
2054                                 }
2055                         }
2056
2057                         // Update the last seen sequence number from this arbitrator
2058                         if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) {
2059                                 if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
2060                                         lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2061                                 }
2062                         } else {
2063                                 lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
2064                         }
2065
2066                         // We processed a new commit that we havent seen before
2067                         didProcessANewCommit = true;
2068
2069                         // Update the committed table of keys and which commit is using which key
2070                         for (KeyValue kv : commit.getKeyValueUpdateSet()) {
2071                                 committedKeyValueTable.put(kv.getKey(), kv);
2072                                 liveCommitsByKeyTable.put(kv.getKey(), commit);
2073                         }
2074                 }
2075         }
2076
2077         return didProcessANewCommit;
2078 }
2079
2080 /**
2081  * Create the speculative table from transactions that are still live and have come from the cloud
2082  */
2083 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2084         if (liveTransactionBySequenceNumberTable.keySet().size() == 0) {
2085                 // There is nothing to speculate on
2086                 return false;
2087         }
2088
2089         // Create a list of the transaction sequence numbers and sort them from oldest to newest
2090         Vector<Long> transactionSequenceNumbersSorted = new Vector<Long>(liveTransactionBySequenceNumberTable.keySet());
2091         Collections.sort(transactionSequenceNumbersSorted);
2092
2093         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2094
2095
2096         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2097                 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2098                 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2099
2100                 // Start from scratch
2101                 speculatedKeyValueTable.clear();
2102                 lastTransactionSequenceNumberSpeculatedOn = -1;
2103                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2104
2105         }
2106
2107         // Remember the front of the transaction list
2108         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0);
2109
2110         // Find where to start arbitration from
2111         int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2112
2113         if (startIndex >= transactionSequenceNumbersSorted.size()) {
2114                 // Make sure we are not out of bounds
2115                 return false;           // did not speculate
2116         }
2117
2118         Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
2119         bool didSkip = true;
2120
2121         for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
2122                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i);
2123                 Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
2124
2125                 if (!transaction.isComplete()) {
2126                         // If there is an incomplete transaction then there is nothing we can do
2127                         // add this transactions arbitrator to the list of arbitrators we should ignore
2128                         incompleteTransactionArbitrator.add(transaction.getArbitrator());
2129                         didSkip = true;
2130                         continue;
2131                 }
2132
2133                 if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) {
2134                         continue;
2135                 }
2136
2137                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2138
2139                 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2140                         // Guard evaluated to true so update the speculative table
2141                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2142                                 speculatedKeyValueTable.put(kv.getKey(), kv);
2143                         }
2144                 }
2145         }
2146
2147         if (didSkip) {
2148                 // Since there was a skip we need to redo the speculation next time around
2149                 lastTransactionSequenceNumberSpeculatedOn = -1;
2150                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2151         }
2152
2153         // We did some speculation
2154         return true;
2155 }
2156
2157 /**
2158  * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2159  */
2160 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2161         if (pendingTransactionQueue.size() == 0) {
2162                 // There is nothing to speculate on
2163                 return;
2164         }
2165
2166
2167         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) {
2168                 // need to reset on the pending speculation
2169                 lastPendingTransactionSpeculatedOn = NULL;
2170                 firstPendingTransaction = pendingTransactionQueue.get(0);
2171                 pendingTransactionSpeculatedKeyValueTable.clear();
2172         }
2173
2174         // Find where to start arbitration from
2175         int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1;
2176
2177         if (startIndex >= pendingTransactionQueue.size()) {
2178                 // Make sure we are not out of bounds
2179                 return;
2180         }
2181
2182         for (int i = startIndex; i < pendingTransactionQueue.size(); i++) {
2183                 Transaction transaction = pendingTransactionQueue.get(i);
2184
2185                 lastPendingTransactionSpeculatedOn = transaction;
2186
2187                 if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2188                         // Guard evaluated to true so update the speculative table
2189                         for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
2190                                 pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv);
2191                         }
2192                 }
2193         }
2194 }
2195
2196 /**
2197  * Set dead and remove from the live transaction tables the transactions that are dead
2198  */
2199 void Table::updateLiveTransactionsAndStatus() {
2200
2201         // Go through each of the transactions
2202         for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
2203                 Transaction transaction = iter.next().getValue();
2204
2205                 // Check if the transaction is dead
2206                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator());
2207                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) {
2208
2209                         // Set dead the transaction
2210                         transaction.setDead();
2211
2212                         // Remove the transaction from the live table
2213                         iter.remove();
2214                         liveTransactionByTransactionIdTable.remove(transaction.getId());
2215                 }
2216         }
2217
2218         // Go through each of the transactions
2219         for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
2220                 TransactionStatus status = iter.next().getValue();
2221
2222                 // Check if the transaction is dead
2223                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator());
2224                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) {
2225
2226                         // Set committed
2227                         status.setStatus(TransactionStatus.StatusCommitted);
2228
2229                         // Remove
2230                         iter.remove();
2231                 }
2232         }
2233 }
2234
2235 /**
2236  * Process this slot, entry by entry.  Also update the latest message sent by slot
2237  */
2238 void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2239
2240         // Update the last message seen
2241         updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2242
2243         // Process each entry in the slot
2244         for (Entry entry : slot.getEntries()) {
2245                 switch (entry.getType()) {
2246
2247                 case Entry.TypeCommitPart:
2248                         processEntry((CommitPart)entry);
2249                         break;
2250
2251                 case Entry.TypeAbort:
2252                         processEntry((Abort)entry);
2253                         break;
2254
2255                 case Entry.TypeTransactionPart:
2256                         processEntry((TransactionPart)entry);
2257                         break;
2258
2259                 case Entry.TypeNewKey:
2260                         processEntry((NewKey)entry);
2261                         break;
2262
2263                 case Entry.TypeLastMessage:
2264                         processEntry((LastMessage)entry, machineSet);
2265                         break;
2266
2267                 case Entry.TypeRejectedMessage:
2268                         processEntry((RejectedMessage)entry, indexer);
2269                         break;
2270
2271                 case Entry.TypeTableStatus:
2272                         processEntry((TableStatus)entry, slot.getSequenceNumber());
2273                         break;
2274
2275                 default:
2276                         throw new Error("Unrecognized type: " + entry.getType());
2277                 }
2278         }
2279 }
2280
2281 /**
2282  * Update the last message that was sent for a machine Id
2283  */
2284 void Table::processEntry(LastMessage entry, HashSet<Long> machineSet) {
2285         // Update what the last message received by a machine was
2286         updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
2287 }
2288
2289 /**
2290  * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2291  */
2292 void Table::processEntry(NewKey entry) {
2293
2294         // Update the arbitrator table with the new key information
2295         arbitratorTable.put(entry.getKey(), entry.getMachineID());
2296
2297         // Update what the latest live new key is
2298         NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry);
2299         if (oldNewKey != NULL) {
2300                 // Delete the old new key messages
2301                 oldNewKey.setDead();
2302         }
2303 }
2304
2305 /**
2306  * Process new table status entries and set dead the old ones as new ones come in.
2307  * keeps track of the largest and smallest table status seen in this current round
2308  * of updating the local copy of the block chain
2309  */
2310 void Table::processEntry(TableStatus entry, int64_t seq) {
2311         int newNumSlots = entry.getMaxSlots();
2312         updateCurrMaxSize(newNumSlots);
2313
2314         initExpectedSize(seq, newNumSlots);
2315
2316         if (liveTableStatus != NULL) {
2317                 // We have a larger table status so the old table status is no int64_ter alive
2318                 liveTableStatus.setDead();
2319         }
2320
2321         // Make this new table status the latest alive table status
2322         liveTableStatus = entry;
2323 }
2324
2325 /**
2326  * Check old messages to see if there is a block chain violation. Also
2327  */
2328 void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
2329         int64_t oldSeqNum = entry.getOldSeqNum();
2330         int64_t newSeqNum = entry.getNewSeqNum();
2331         bool isequal = entry.getEqual();
2332         int64_t machineId = entry.getMachineID();
2333         int64_t seq = entry.getSequenceNumber();
2334
2335
2336         // Check if we have messages that were supposed to be rejected in our local block chain
2337         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2338
2339                 // Get the slot
2340                 Slot slot = indexer.getSlot(seqNum);
2341
2342                 if (slot != NULL) {
2343                         // If we have this slot make sure that it was not supposed to be a rejected slot
2344
2345                         int64_t slotMachineId = slot.getMachineID();
2346                         if (isequal != (slotMachineId == machineId)) {
2347                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2348                         }
2349                 }
2350         }
2351
2352
2353         // Create a list of clients to watch until they see this rejected message entry.
2354         HashSet<Long> deviceWatchSet = new HashSet<Long>();
2355         for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
2356
2357                 // Machine ID for the last message entry
2358                 int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
2359
2360                 // We've seen it, don't need to continue to watch.  Our next
2361                 // message will implicitly acknowledge it.
2362                 if (lastMessageEntryMachineId == localMachineId) {
2363                         continue;
2364                 }
2365
2366                 Pair<int64_t Liveness> lastMessageValue = lastMessageEntry.getValue();
2367                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2368
2369                 if (entrySequenceNumber < seq) {
2370
2371                         // Add this rejected message to the set of messages that this machine ID did not see yet
2372                         addWatchVector(lastMessageEntryMachineId, entry);
2373
2374                         // This client did not see this rejected message yet so add it to the watch set to monitor
2375                         deviceWatchSet.add(lastMessageEntryMachineId);
2376                 }
2377         }
2378
2379         if (deviceWatchSet.isEmpty()) {
2380                 // This rejected message has been seen by all the clients so
2381                 entry.setDead();
2382         } else {
2383                 // We need to watch this rejected message
2384                 entry.setWatchSet(deviceWatchSet);
2385         }
2386 }
2387
2388 /**
2389  * Check if this abort is live, if not then save it so we can kill it later.
2390  * update the last transaction number that was arbitrated on.
2391  */
2392 void Table::processEntry(Abort entry) {
2393
2394
2395         if (entry.getTransactionSequenceNumber() != -1) {
2396                 // update the transaction status if it was sent to the server
2397                 TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
2398                 if (status != NULL) {
2399                         status.setStatus(TransactionStatus.StatusAborted);
2400                 }
2401         }
2402
2403         // Abort has not been seen by the client it is for yet so we need to keep track of it
2404         Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
2405         if (previouslySeenAbort != NULL) {
2406                 previouslySeenAbort.setDead();  // Delete old version of the abort since we got a rescued newer version
2407         }
2408
2409         if (entry.getTransactionArbitrator() == localMachineId) {
2410                 liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
2411         }
2412
2413         if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
2414
2415                 // The machine already saw this so it is dead
2416                 entry.setDead();
2417                 liveAbortTable.remove(entry.getAbortId());
2418
2419                 if (entry.getTransactionArbitrator() == localMachineId) {
2420                         liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
2421                 }
2422
2423                 return;
2424         }
2425
2426
2427
2428
2429         // Update the last arbitration data that we have seen so far
2430         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) {
2431
2432                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
2433                 if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
2434                         // Is larger
2435                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2436                 }
2437         } else {
2438                 // Never seen any data from this arbitrator so record the first one
2439                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
2440         }
2441
2442
2443         // Set dead a transaction if we can
2444         Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair<int64_t, int64_t>(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber()));
2445         if (transactionToSetDead != NULL) {
2446                 liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber());
2447         }
2448
2449         // Update the last transaction sequence number that the arbitrator arbitrated on
2450         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator());
2451         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2452
2453                 // Is a valid one
2454                 if (entry.getTransactionSequenceNumber() != -1) {
2455                         lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber());
2456                 }
2457         }
2458 }
2459
2460 /**
2461  * Set dead the transaction part if that transaction is dead and keep track of all new parts
2462  */
2463 void Table::processEntry(TransactionPart entry) {
2464         // Check if we have already seen this transaction and set it dead OR if it is not alive
2465         Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId());
2466         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) {
2467                 // This transaction is dead, it was already committed or aborted
2468                 entry.setDead();
2469                 return;
2470         }
2471
2472         // This part is still alive
2473         Hashtable<Pair<int64_t int32_t>, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId());
2474
2475         if (transactionPart == NULL) {
2476                 // Dont have a table for this machine Id yet so make one
2477                 transactionPart = new Hashtable<Pair<int64_t int32_t>, TransactionPart>();
2478                 newTransactionParts.put(entry.getMachineId(), transactionPart);
2479         }
2480
2481         // Update the part and set dead ones we have already seen (got a rescued version)
2482         TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry);
2483         if (previouslySeenPart != NULL) {
2484                 previouslySeenPart.setDead();
2485         }
2486 }
2487
2488 /**
2489  * Process new commit entries and save them for future use.  Delete duplicates
2490  */
2491 void Table::processEntry(CommitPart entry) {
2492
2493
2494         // Update the last transaction that was updated if we can
2495         if (entry.getTransactionSequenceNumber() != -1) {
2496                 Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
2497
2498                 // Update the last transaction sequence number that the arbitrator arbitrated on
2499                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
2500                         lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
2501                 }
2502         }
2503
2504
2505
2506
2507         Hashtable<Pair<int64_t int32_t>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
2508
2509         if (commitPart == NULL) {
2510                 // Don't have a table for this machine Id yet so make one
2511                 commitPart = new Hashtable<Pair<int64_t int32_t>, CommitPart>();
2512                 newCommitParts.put(entry.getMachineId(), commitPart);
2513         }
2514
2515         // Update the part and set dead ones we have already seen (got a rescued version)
2516         CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry);
2517         if (previouslySeenPart != NULL) {
2518                 previouslySeenPart.setDead();
2519         }
2520 }
2521
2522 /**
2523  * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
2524  * Updates the live aborts, removes those that are dead and sets them dead.
2525  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2526  * other clients have not had a rollback on the last message.
2527  */
2528 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
2529
2530         // We have seen this machine ID
2531         machineSet.remove(machineId);
2532
2533         // Get the set of rejected messages that this machine Id is has not seen yet
2534         HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
2535
2536         // If there is a rejected message that this machine Id has not seen yet
2537         if (watchset != NULL) {
2538
2539                 // Go through each rejected message that this machine Id has not seen yet
2540                 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
2541
2542                         RejectedMessage rm = rmit.next();
2543
2544                         // If this machine Id has seen this rejected message...
2545                         if (rm.getSequenceNumber() <= seqNum) {
2546
2547                                 // Remove it from our watchlist
2548                                 rmit.remove();
2549
2550                                 // Decrement machines that need to see this notification
2551                                 rm.removeWatcher(machineId);
2552                         }
2553                 }
2554         }
2555
2556         // Set dead the abort
2557         for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
2558                 Abort abort = i.next().getValue();
2559
2560                 if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
2561                         abort.setDead();
2562                         i.remove();
2563
2564                         if (abort.getTransactionArbitrator() == localMachineId) {
2565                                 liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
2566                         }
2567                 }
2568         }
2569
2570
2571
2572         if (machineId == localMachineId) {
2573                 // Our own messages are immediately dead.
2574                 if (liveness instanceof LastMessage) {
2575                         ((LastMessage)liveness).setDead();
2576                 } else if (liveness instanceof Slot) {
2577                         ((Slot)liveness).setDead();
2578                 } else {
2579                         throw new Error("Unrecognized type");
2580                 }
2581         }
2582
2583         // Get the old last message for this device
2584         Pair<int64_t Liveness> lastMessageEntry = lastMessageTable.put(machineId, new Pair<int64_t Liveness>(seqNum, liveness));
2585         if (lastMessageEntry == NULL) {
2586                 // If no last message then there is nothing else to process
2587                 return;
2588         }
2589
2590         int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2591         Liveness lastEntry = lastMessageEntry.getSecond();
2592
2593         // If it is not our machine Id since we already set ours to dead
2594         if (machineId != localMachineId) {
2595                 if (lastEntry instanceof LastMessage) {
2596                         ((LastMessage)lastEntry).setDead();
2597                 } else if (lastEntry instanceof Slot) {
2598                         ((Slot)lastEntry).setDead();
2599                 } else {
2600                         throw new Error("Unrecognized type");
2601                 }
2602         }
2603
2604         // Make sure the server is not playing any games
2605         if (machineId == localMachineId) {
2606
2607                 if (hadPartialSendToServer) {
2608                         // We were not making any updates and we had a machine mismatch
2609                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2610                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2611                         }
2612
2613                 } else {
2614                         // We were not making any updates and we had a machine mismatch
2615                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2616                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2617                         }
2618                 }
2619         } else {
2620                 if (lastMessageSeqNum > seqNum) {
2621                         throw new Error("Server Error: Rollback on remote machine sequence number");
2622                 }
2623         }
2624 }
2625
2626 /**
2627  * Add a rejected message entry to the watch set to keep track of which clients have seen that
2628  * rejected message entry and which have not.
2629  */
2630 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
2631         HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
2632         if (entries == NULL) {
2633                 // There is no set for this machine ID yet so create one
2634                 entries = new HashSet<RejectedMessage>();
2635                 rejectedMessageWatchVectorTable.put(machineId, entries);
2636         }
2637         entries.add(entry);
2638 }
2639
2640 /**
2641  * Check if the HMAC chain is not violated
2642  */
2643 void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
2644         for (int i = 0; i < newSlots.length; i++) {
2645                 Slot currSlot = newSlots[i];
2646                 Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
2647                 if (prevSlot != NULL &&
2648                                 !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
2649                         throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2650         }
2651 }
2652