edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void * a, const void *b) {
25         const int64_t * pa = (const int64_t *) a;
26         const int64_t * pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localTransactionSequenceNumber(0),
50         lastTransactionSequenceNumberSpeculatedOn(0),
51         oldestTransactionSequenceNumberSpeculatedOn(0),
52         localArbitrationSequenceNumber(0),
53         hadPartialSendToServer(false),
54         attemptedToSendToServer(false),
55         expectedsize(0),
56         didFindTableStatus(false),
57         currMaxSize(0),
58         lastSlotAttemptedToSend(NULL),
59         lastIsNewKey(false),
60         lastNewSize(0),
61         lastTransactionPartsSent(NULL),
62         lastPendingSendArbitrationEntriesToDelete(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localTransactionSequenceNumber(0),
112         lastTransactionSequenceNumberSpeculatedOn(0),
113         oldestTransactionSequenceNumberSpeculatedOn(0),
114         localArbitrationSequenceNumber(0),
115         hadPartialSendToServer(false),
116         attemptedToSendToServer(false),
117         expectedsize(0),
118         didFindTableStatus(false),
119         currMaxSize(0),
120         lastSlotAttemptedToSend(NULL),
121         lastIsNewKey(false),
122         lastNewSize(0),
123         lastTransactionPartsSent(NULL),
124         lastPendingSendArbitrationEntriesToDelete(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 /**
160  * Init all the stuff needed for for table usage
161  */
162 void Table::init() {
163         // Init helper objects
164         random = new Random();
165         buffer = new SlotBuffer();
166
167         // init data structs
168         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174         arbitratorTable = new Hashtable<IoTString *, int64_t>();
175         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184         rejectedSlotVector = new Vector<int64_t>();
185         pendingTransactionQueue = new Vector<Transaction *>();
186         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
195
196         // Other init stuff
197         numberOfSlots = buffer->capacity();
198         setResizeThreshold();
199 }
200
201 /**
202  * Initialize the table by inserting a table status as the first entry
203  * into the table status also initialize the crypto stuff.
204  */
205 void Table::initTable() {
206         cloud->initSecurity();
207
208         // Create the first insertion into the block chain which is the table status
209         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210         localSequenceNumber++;
211         TableStatus *status = new TableStatus(s, numberOfSlots);
212         s->addEntry(status);
213         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
214
215         if (array == NULL) {
216                 array = new Array<Slot *>(1);
217                 array->set(0, s);
218                 // update local block chain
219                 validateAndUpdate(array, true);
220         } else if (array->length() == 1) {
221                 // in case we did push the slot BUT we failed to init it
222                 validateAndUpdate(array, true);
223         } else {
224                 throw new Error("Error on initialization");
225         }
226 }
227
228 /**
229  * Rebuild the table from scratch by pulling the latest block chain
230  * from the server.
231  */
232 void Table::rebuild() {
233         // Just pull the latest slots from the server
234         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235         validateAndUpdate(newslots, true);
236         sendToServer(NULL);
237         updateLiveTransactionsAndStatus();
238 }
239
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
242 }
243
244 int64_t Table::getArbitrator(IoTString *key) {
245         return arbitratorTable->get(key);
246 }
247
248 void Table::close() {
249         cloud->close();
250 }
251
252 IoTString *Table::getCommitted(IoTString *key)  {
253         KeyValue *kv = committedKeyValueTable->get(key);
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getSpeculative(IoTString *key) {
263         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
264
265         if (kv == NULL) {
266                 kv = speculatedKeyValueTable->get(key);
267         }
268
269         if (kv == NULL) {
270                 kv = committedKeyValueTable->get(key);
271         }
272
273         if (kv != NULL) {
274                 return kv->getValue();
275         } else {
276                 return NULL;
277         }
278 }
279
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281         KeyValue *kv = committedKeyValueTable->get(key);
282
283         if (!arbitratorTable->contains(key)) {
284                 throw new Error("Key not Found.");
285         }
286
287         // Make sure new key value pair matches the current arbitrator
288         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289                 // TODO: Maybe not throw en error
290                 throw new Error("Not all Key Values Match Arbitrator.");
291         }
292
293         if (kv != NULL) {
294                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295                 return kv->getValue();
296         } else {
297                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
298                 return NULL;
299         }
300 }
301
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303         if (!arbitratorTable->contains(key)) {
304                 throw new Error("Key not Found.");
305         }
306
307         // Make sure new key value pair matches the current arbitrator
308         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309                 // TODO: Maybe not throw en error
310                 throw new Error("Not all Key Values Match Arbitrator.");
311         }
312
313         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
314
315         if (kv == NULL) {
316                 kv = speculatedKeyValueTable->get(key);
317         }
318
319         if (kv == NULL) {
320                 kv = committedKeyValueTable->get(key);
321         }
322
323         if (kv != NULL) {
324                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325                 return kv->getValue();
326         } else {
327                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
328                 return NULL;
329         }
330 }
331
332 bool Table::update()  {
333         try {
334                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335                 validateAndUpdate(newSlots, false);
336                 sendToServer(NULL);
337                 updateLiveTransactionsAndStatus();
338                 return true;
339         } catch (Exception *e) {
340                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341                 while (kit->hasNext()) {
342                         int64_t m = kit->next();
343                         updateFromLocal(m);
344                 }
345                 delete kit;
346         }
347
348         return false;
349 }
350
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
352         while (true) {
353                 if (!arbitratorTable->contains(keyName)) {
354                         // There is already an arbitrator
355                         return false;
356                 }
357                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
358
359                 if (sendToServer(newKey)) {
360                         // If successfully inserted
361                         return true;
362                 }
363         }
364 }
365
366 void Table::startTransaction() {
367         // Create a new transaction, invalidates any old pending transactions.
368         pendingTransactionBuilder = new PendingTransaction(localMachineId);
369 }
370
371 void Table::addKV(IoTString *key, IoTString *value) {
372
373         // Make sure it is a valid key
374         if (!arbitratorTable->contains(key)) {
375                 throw new Error("Key not Found.");
376         }
377
378         // Make sure new key value pair matches the current arbitrator
379         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380                 // TODO: Maybe not throw en error
381                 throw new Error("Not all Key Values Match Arbitrator.");
382         }
383
384         // Add the key value to this transaction
385         KeyValue *kv = new KeyValue(key, value);
386         pendingTransactionBuilder->addKV(kv);
387 }
388
389 TransactionStatus *Table::commitTransaction() {
390         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391                 // transaction with no updates will have no effect on the system
392                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
393         }
394
395         // Set the local transaction sequence number and increment
396         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397         localTransactionSequenceNumber++;
398
399         // Create the transaction status
400         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
401
402         // Create the new transaction
403         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404         newTransaction->setTransactionStatus(transactionStatus);
405
406         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407                 // Add it to the queue and invalidate the builder for safety
408                 pendingTransactionQueue->add(newTransaction);
409         } else {
410                 arbitrateOnLocalTransaction(newTransaction);
411                 updateLiveStateFromLocal();
412         }
413
414         pendingTransactionBuilder = new PendingTransaction(localMachineId);
415
416         try {
417                 sendToServer(NULL);
418         } catch (ServerException *e) {
419
420                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421                 uint size = pendingTransactionQueue->size();
422                 uint oldindex = 0;
423                 for (int iter = 0; iter < size; iter++) {
424                         Transaction *transaction = pendingTransactionQueue->get(iter);
425                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
426
427                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428                                 // Already contacted this client so ignore all attempts to contact this client
429                                 // to preserve ordering for arbitrator
430                                 continue;
431                         }
432
433                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
434
435                         if (sendReturn.getFirst()) {
436                                 // Failed to contact over local
437                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
438                         } else {
439                                 // Successful contact or should not contact
440
441                                 if (sendReturn.getSecond()) {
442                                         // did arbitrate
443                                         oldindex--;
444                                 }
445                         }
446                 }
447                 pendingTransactionQueue->setSize(oldindex);
448         }
449
450         updateLiveStateFromLocal();
451
452         return transactionStatus;
453 }
454
455 /**
456  * Recalculate the new resize threshold
457  */
458 void Table::setResizeThreshold() {
459         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
461 }
462
463 int64_t Table::getLocalSequenceNumber() {
464         return localSequenceNumber;
465 }
466
467 bool Table::sendToServer(NewKey *newKey) {
468         bool fromRetry = false;
469         try {
470                 if (hadPartialSendToServer) {
471                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472                         if (newSlots->length() == 0) {
473                                 fromRetry = true;
474                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
475
476                                 if (sendSlotsReturn.getFirst()) {
477                                         if (newKey != NULL) {
478                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
479                                                         newKey = NULL;
480                                                 }
481                                         }
482
483                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484                                         while (trit->hasNext()) {
485                                                 Transaction *transaction = trit->next();
486                                                 transaction->resetServerFailure();
487                                                 // Update which transactions parts still need to be sent
488                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489                                                 // Add the transaction status to the outstanding list
490                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
491
492                                                 // Update the transaction status
493                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
494
495                                                 // Check if all the transaction parts were successfully
496                                                 // sent and if so then remove it from pending
497                                                 if (transaction->didSendAllParts()) {
498                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499                                                         pendingTransactionQueue->remove(transaction);
500                                                 }
501                                         }
502                                         delete trit;
503                                 } else {
504                                         newSlots = sendSlotsReturn.getThird();
505                                         bool isInserted = false;
506                                         for (uint si = 0; si < newSlots->length(); si++) {
507                                                 Slot *s = newSlots->get(si);
508                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
509                                                         isInserted = true;
510                                                         break;
511                                                 }
512                                         }
513
514                                         for (uint si = 0; si < newSlots->length(); si++) {
515                                                 Slot *s = newSlots->get(si);
516                                                 if (isInserted) {
517                                                         break;
518                                                 }
519
520                                                 // Process each entry in the slot
521                                                 Vector<Entry *> *ventries = s->getEntries();
522                                                 uint vesize = ventries->size();
523                                                 for (uint vei = 0; vei < vesize; vei++) {
524                                                         Entry *entry = ventries->get(vei);
525                                                         if (entry->getType() == TypeLastMessage) {
526                                                                 LastMessage *lastMessage = (LastMessage *)entry;
527                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
528                                                                         isInserted = true;
529                                                                         break;
530                                                                 }
531                                                         }
532                                                 }
533                                         }
534
535                                         if (isInserted) {
536                                                 if (newKey != NULL) {
537                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
538                                                                 newKey = NULL;
539                                                         }
540                                                 }
541
542                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543                                                 while (trit->hasNext()) {
544                                                         Transaction *transaction = trit->next();
545                                                         transaction->resetServerFailure();
546
547                                                         // Update which transactions parts still need to be sent
548                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
549
550                                                         // Add the transaction status to the outstanding list
551                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
552
553                                                         // Update the transaction status
554                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
555
556                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
557                                                         if (transaction->didSendAllParts()) {
558                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559                                                                 pendingTransactionQueue->remove(transaction);
560                                                         } else {
561                                                                 transaction->resetServerFailure();
562                                                                 // Set the transaction sequence number back to nothing
563                                                                 if (!transaction->didSendAPartToServer()) {
564                                                                         transaction->setSequenceNumber(-1);
565                                                                 }
566                                                         }
567                                                 }
568                                                 delete trit;
569                                         }
570                                 }
571
572                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573                                 while (trit->hasNext()) {
574                                         Transaction *transaction = trit->next();
575                                         transaction->resetServerFailure();
576                                         // Set the transaction sequence number back to nothing
577                                         if (!transaction->didSendAPartToServer()) {
578                                                 transaction->setSequenceNumber(-1);
579                                         }
580                                 }
581                                 delete trit;
582
583                                 if (sendSlotsReturn.getThird()->length() != 0) {
584                                         // insert into the local block chain
585                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
586                                 }
587                                 // continue;
588                         } else {
589                                 bool isInserted = false;
590                                 for (uint si = 0; si < newSlots->length(); si++) {
591                                         Slot *s = newSlots->get(si);
592                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
593                                                 isInserted = true;
594                                                 break;
595                                         }
596                                 }
597
598                                 for (uint si = 0; si < newSlots->length(); si++) {
599                                         Slot *s = newSlots->get(si);
600                                         if (isInserted) {
601                                                 break;
602                                         }
603
604                                         // Process each entry in the slot
605                                         Vector<Entry *> *entries = s->getEntries();
606                                         uint eSize = entries->size();
607                                         for(uint ei=0; ei < eSize; ei++) {
608                                                 Entry * entry = entries->get(ei);
609                                                 
610                                                 if (entry->getType() == TypeLastMessage) {
611                                                         LastMessage *lastMessage = (LastMessage *)entry;
612                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
613                                                                 isInserted = true;
614                                                                 break;
615                                                         }
616                                                 }
617                                         }
618                                 }
619
620                                 if (isInserted) {
621                                         if (newKey != NULL) {
622                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
623                                                         newKey = NULL;
624                                                 }
625                                         }
626
627                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628                                         while (trit->hasNext()) {
629                                                 Transaction *transaction = trit->next();
630                                                 transaction->resetServerFailure();
631
632                                                 // Update which transactions parts still need to be sent
633                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
634
635                                                 // Add the transaction status to the outstanding list
636                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
637
638                                                 // Update the transaction status
639                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
640
641                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642                                                 if (transaction->didSendAllParts()) {
643                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644                                                         pendingTransactionQueue->remove(transaction);
645                                                 } else {
646                                                         transaction->resetServerFailure();
647                                                         // Set the transaction sequence number back to nothing
648                                                         if (!transaction->didSendAPartToServer()) {
649                                                                 transaction->setSequenceNumber(-1);
650                                                         }
651                                                 }
652                                         }
653                                         delete trit;
654                                 } else {
655                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656                                         while (trit->hasNext()) {
657                                                 Transaction *transaction = trit->next();
658                                                 transaction->resetServerFailure();
659                                                 // Set the transaction sequence number back to nothing
660                                                 if (!transaction->didSendAPartToServer()) {
661                                                         transaction->setSequenceNumber(-1);
662                                                 }
663                                         }
664                                         delete trit;
665                                 }
666
667                                 // insert into the local block chain
668                                 validateAndUpdate(newSlots, true);
669                         }
670                 }
671         } catch (ServerException *e) {
672                 throw e;
673         }
674
675
676
677         try {
678                 // While we have stuff that needs inserting into the block chain
679                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
680
681                         fromRetry = false;
682
683                         if (hadPartialSendToServer) {
684                                 throw new Error("Should Be error free");
685                         }
686
687
688
689                         // If there is a new key with same name then end
690                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
691                                 return false;
692                         }
693
694                         // Create the slot
695                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696                         localSequenceNumber++;
697
698                         // Try to fill the slot with data
699                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700                         bool needsResize = fillSlotsReturn.getFirst();
701                         int newSize = fillSlotsReturn.getSecond();
702                         bool insertedNewKey = fillSlotsReturn.getThird();
703
704                         if (needsResize) {
705                                 // Reset which transaction to send
706                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707                                 while (trit->hasNext()) {
708                                         Transaction *transaction = trit->next();
709                                         transaction->resetNextPartToSend();
710
711                                         // Set the transaction sequence number back to nothing
712                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713                                                 transaction->setSequenceNumber(-1);
714                                         }
715                                 }
716                                 delete trit;
717
718                                 // Clear the sent data since we are trying again
719                                 pendingSendArbitrationEntriesToDelete->clear();
720                                 transactionPartsSent->clear();
721
722                                 // We needed a resize so try again
723                                 fillSlot(slot, true, newKey);
724                         }
725
726                         lastSlotAttemptedToSend = slot;
727                         lastIsNewKey = (newKey != NULL);
728                         lastInsertedNewKey = insertedNewKey;
729                         lastNewSize = newSize;
730                         lastNewKey = newKey;
731                         lastTransactionPartsSent = transactionPartsSent->clone();
732                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
733
734                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
735
736                         if (sendSlotsReturn.getFirst()) {
737
738                                 // Did insert into the block chain
739
740                                 if (insertedNewKey) {
741                                         // This slot was what was inserted not a previous slot
742
743                                         // New Key was successfully inserted into the block chain so dont want to insert it again
744                                         newKey = NULL;
745                                 }
746
747                                 // Remove the aborts and commit parts that were sent from the pending to send queue
748                                 uint size = pendingSendArbitrationRounds->size();
749                                 uint oldcount = 0;
750                                 for (uint i = 0; i < size; i++) {
751                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
753
754                                         if (!round->isDoneSending()) {
755                                                 // Sent all the parts
756                                                 pendingSendArbitrationRounds->set(oldcount++,
757                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
758                                         }
759                                 }
760                                 pendingSendArbitrationRounds->setSize(oldcount);
761
762                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763                                 while (trit->hasNext()) {
764                                         Transaction *transaction = trit->next();
765                                         transaction->resetServerFailure();
766
767                                         // Update which transactions parts still need to be sent
768                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
769
770                                         // Add the transaction status to the outstanding list
771                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
772
773                                         // Update the transaction status
774                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
775
776                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
777                                         if (transaction->didSendAllParts()) {
778                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779                                                 pendingTransactionQueue->remove(transaction);
780                                         }
781                                 }
782                                 delete trit;
783                         } else {
784                                 // Reset which transaction to send
785                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786                                 while (trit->hasNext()) {
787                                         Transaction *transaction = trit->next();
788                                         transaction->resetNextPartToSend();
789
790                                         // Set the transaction sequence number back to nothing
791                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792                                                 transaction->setSequenceNumber(-1);
793                                         }
794                                 }
795                                 delete trit;
796                         }
797
798                         // Clear the sent data in preparation for next send
799                         pendingSendArbitrationEntriesToDelete->clear();
800                         transactionPartsSent->clear();
801
802                         if (sendSlotsReturn.getThird()->length() != 0) {
803                                 // insert into the local block chain
804                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
805                         }
806                 }
807
808         } catch (ServerException *e) {
809                 if (e->getType() != ServerException_TypeInputTimeout) {
810                         // Nothing was able to be sent to the server so just clear these data structures
811                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812                         while (trit->hasNext()) {
813                                 Transaction *transaction = trit->next();
814                                 transaction->resetNextPartToSend();
815
816                                 // Set the transaction sequence number back to nothing
817                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818                                         transaction->setSequenceNumber(-1);
819                                 }
820                         }
821                         delete trit;
822                 } else {
823                         // There was a partial send to the server
824                         hadPartialSendToServer = true;
825
826                         // Nothing was able to be sent to the server so just clear these data structures
827                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828                         while (trit->hasNext()) {
829                                 Transaction *transaction = trit->next();
830                                 transaction->resetNextPartToSend();
831                                 transaction->setServerFailure();
832                         }
833                         delete trit;
834                 }
835
836                 pendingSendArbitrationEntriesToDelete->clear();
837                 transactionPartsSent->clear();
838
839                 throw e;
840         }
841
842         return newKey == NULL;
843 }
844
845 bool Table::updateFromLocal(int64_t machineId) {
846         if (!localCommunicationTable->contains(machineId))
847                 return false;
848
849         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
850
851         // Get the size of the send data
852         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
853
854         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
857         }
858
859         Array<char> *sendData = new Array<char>(sendDataSize);
860         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
861
862         // Encode the data
863         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
864         bbEncode->putInt(0);
865
866         // Send by local
867         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868         localSequenceNumber++;
869
870         if (returnData == NULL) {
871                 // Could not contact server
872                 return false;
873         }
874
875         // Decode the data
876         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877         int numberOfEntries = bbDecode->getInt();
878
879         for (int i = 0; i < numberOfEntries; i++) {
880                 char type = bbDecode->get();
881                 if (type == TypeAbort) {
882                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
883                         processEntry(abort);
884                 } else if (type == TypeCommitPart) {
885                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886                         processEntry(commitPart);
887                 }
888         }
889
890         updateLiveStateFromLocal();
891
892         return true;
893 }
894
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
896
897         // Get the devices local communications
898         if (!localCommunicationTable->contains(transaction->getArbitrator()))
899                 return Pair<bool, bool>(true, false);
900         
901         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
902
903         // Get the size of the send data
904         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
905         {
906                 Vector<TransactionPart *> * tParts = transaction->getParts();
907                 uint tPartsSize = tParts->size();
908                 for (uint i = 0; i < tPartsSize; i++) {
909                         TransactionPart * part = tParts->get(i);
910                         sendDataSize += part->getSize();
911                 }
912         }
913
914         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
917         }
918
919         // Make the send data size
920         Array<char> *sendData = new Array<char>(sendDataSize);
921         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
922
923         // Encode the data
924         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925         bbEncode->putInt(transaction->getParts()->size());
926         {
927                 Vector<TransactionPart *> * tParts = transaction->getParts();
928                 uint tPartsSize = tParts->size();
929                 for (uint i = 0; i < tPartsSize; i++) {
930                         TransactionPart * part = tParts->get(i);
931                         part->encode(bbEncode);
932                 }
933         }
934
935         // Send by local
936         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937         localSequenceNumber++;
938
939         if (returnData == NULL) {
940                 // Could not contact server
941                 return Pair<bool, bool>(true, false);
942         }
943
944         // Decode the data
945         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946         bool didCommit = bbDecode->get() == 1;
947         bool couldArbitrate = bbDecode->get() == 1;
948         int numberOfEntries = bbDecode->getInt();
949         bool foundAbort = false;
950
951         for (int i = 0; i < numberOfEntries; i++) {
952                 char type = bbDecode->get();
953                 if (type == TypeAbort) {
954                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
955
956                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
957                                 foundAbort = true;
958                         }
959
960                         processEntry(abort);
961                 } else if (type == TypeCommitPart) {
962                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963                         processEntry(commitPart);
964                 }
965         }
966
967         updateLiveStateFromLocal();
968
969         if (couldArbitrate) {
970                 TransactionStatus * status =  transaction->getTransactionStatus();
971                 if (didCommit) {
972                         status->setStatus(TransactionStatus_StatusCommitted);
973                 } else {
974                         status->setStatus(TransactionStatus_StatusAborted);
975                 }
976         } else {
977                 TransactionStatus * status =  transaction->getTransactionStatus();
978                 if (foundAbort) {
979                         status->setStatus(TransactionStatus_StatusAborted);
980                 } else {
981                         status->setStatus(TransactionStatus_StatusCommitted);
982                 }
983         }
984
985         return Pair<bool, bool>(false, true);
986 }
987
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
989         // Decode the data
990         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992         int numberOfParts = bbDecode->getInt();
993
994         // If we did commit a transaction or not
995         bool didCommit = false;
996         bool couldArbitrate = false;
997
998         if (numberOfParts != 0) {
999
1000                 // decode the transaction
1001                 Transaction *transaction = new Transaction();
1002                 for (int i = 0; i < numberOfParts; i++) {
1003                         bbDecode->get();
1004                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005                         transaction->addPartDecode(newPart);
1006                 }
1007
1008                 // Arbitrate on transaction and pull relevant return data
1009                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010                 couldArbitrate = localArbitrateReturn.getFirst();
1011                 didCommit = localArbitrateReturn.getSecond();
1012
1013                 updateLiveStateFromLocal();
1014
1015                 // Transaction was sent to the server so keep track of it to prevent double commit
1016                 if (transaction->getSequenceNumber() != -1) {
1017                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1018                 }
1019         }
1020
1021         // The data to send back
1022         int returnDataSize = 0;
1023         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1024
1025         // Get the aborts to send back
1026         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1027         {
1028                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029                 while(abortit->hasNext())
1030                         abortLocalSequenceNumbers->add(abortit->next());
1031                 delete abortit;
1032         }
1033         
1034         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1035
1036         uint asize = abortLocalSequenceNumbers->size();
1037         for(uint i=0; i<asize; i++) {
1038                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1040                         continue;
1041                 }
1042                 
1043                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044                 unseenArbitrations->add(abort);
1045                 returnDataSize += abort->getSize();
1046         }
1047
1048         // Get the commits to send back
1049         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050         if (commitForClientTable != NULL) {
1051                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1052                 {
1053                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054                         while(commitit->hasNext())
1055                                 commitLocalSequenceNumbers->add(commitit->next());
1056                         delete commitit;
1057                 }
1058                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1059
1060                 uint clsSize = commitLocalSequenceNumbers->size();
1061                 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1064
1065                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1066                                 continue;
1067                         }
1068
1069                         {
1070                                 Vector<CommitPart *> * parts = commit->getParts();
1071                                 uint nParts = parts->size();
1072                                 for(uint i=0; i<nParts; i++) {
1073                                         CommitPart * commitPart = parts->get(i);
1074                                         unseenArbitrations->add(commitPart);
1075                                         returnDataSize += commitPart->getSize();
1076                                 }
1077                         }
1078                 }
1079         }
1080
1081         // Number of arbitration entries to decode
1082         returnDataSize += 2 * sizeof(int32_t);
1083
1084         // bool of did commit or not
1085         if (numberOfParts != 0) {
1086                 returnDataSize += sizeof(char);
1087         }
1088
1089         // Data to send Back
1090         Array<char> *returnData = new Array<char>(returnDataSize);
1091         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1092
1093         if (numberOfParts != 0) {
1094                 if (didCommit) {
1095                         bbEncode->put((char)1);
1096                 } else {
1097                         bbEncode->put((char)0);
1098                 }
1099                 if (couldArbitrate) {
1100                         bbEncode->put((char)1);
1101                 } else {
1102                         bbEncode->put((char)0);
1103                 }
1104         }
1105
1106         bbEncode->putInt(unseenArbitrations->size());
1107         uint size = unseenArbitrations->size();
1108         for (uint i = 0; i < size; i++) {
1109                 Entry *entry = unseenArbitrations->get(i);
1110                 entry->encode(bbEncode);
1111         }
1112
1113         localSequenceNumber++;
1114         return returnData;
1115 }
1116
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119         attemptedToSendToServer = true;
1120
1121         bool inserted = false;
1122         bool lastTryInserted = false;
1123
1124         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125         if (array == NULL) {
1126                 array = new Array<Slot *>();
1127                 array->set(0, slot);
1128                 rejectedSlotVector->clear();
1129                 inserted = true;
1130         } else {
1131                 if (array->length() == 0) {
1132                         throw new Error("Server Error: Did not send any slots");
1133                 }
1134
1135                 // if (attemptedToSendToServerTmp) {
1136                 if (hadPartialSendToServer) {
1137
1138                         bool isInserted = false;
1139                         uint size = array->length();
1140                         for (uint i = 0; i < size; i++) {
1141                                 Slot *s = array->get(i);
1142                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1143                                         isInserted = true;
1144                                         break;
1145                                 }
1146                         }
1147
1148                         for (uint i = 0; i < size; i++) {
1149                                 Slot *s = array->get(i);
1150                                 if (isInserted) {
1151                                         break;
1152                                 }
1153
1154                                 // Process each entry in the slot
1155                                 Vector<Entry *> *entries = s->getEntries();
1156                                 uint eSize = entries->size();
1157                                 for(uint ei=0; ei < eSize; ei++) {
1158                                         Entry * entry = entries->get(ei);
1159
1160                                         if (entry->getType() == TypeLastMessage) {
1161                                                 LastMessage *lastMessage = (LastMessage *)entry;
1162
1163                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1164                                                         isInserted = true;
1165                                                         break;
1166                                                 }
1167                                         }
1168                                 }
1169                         }
1170
1171                         if (!isInserted) {
1172                                 rejectedSlotVector->add(slot->getSequenceNumber());
1173                                 lastTryInserted = false;
1174                         } else {
1175                                 lastTryInserted = true;
1176                         }
1177                 } else {
1178                         rejectedSlotVector->add(slot->getSequenceNumber());
1179                         lastTryInserted = false;
1180                 }
1181         }
1182
1183         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1184 }
1185
1186 /**
1187  * Returns false if a resize was needed
1188  */
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1190         int newSize = 0;
1191         if (liveSlotCount > bufferResizeThreshold) {
1192                 resize = true;//Resize is forced
1193         }
1194
1195         if (resize) {
1196                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197                 TableStatus *status = new TableStatus(slot, newSize);
1198                 slot->addEntry(status);
1199         }
1200
1201         // Fill with rejected slots first before doing anything else
1202         doRejectedMessages(slot);
1203
1204         // Do mandatory rescue of entries
1205         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1206
1207         // Extract working variables
1208         bool needsResize = mandatoryRescueReturn.getFirst();
1209         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1211
1212         if (needsResize && !resize) {
1213                 // We need to resize but we are not resizing so return false
1214                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1215         }
1216
1217         bool inserted = false;
1218         if (newKeyEntry != NULL) {
1219                 newKeyEntry->setSlot(slot);
1220                 if (slot->hasSpace(newKeyEntry)) {
1221                         slot->addEntry(newKeyEntry);
1222                         inserted = true;
1223                 }
1224         }
1225
1226         // Clear the transactions, aborts and commits that were sent previously
1227         transactionPartsSent->clear();
1228         pendingSendArbitrationEntriesToDelete->clear();
1229         uint size = pendingSendArbitrationRounds->size();
1230         for (uint i = 0; i < size; i++) {
1231                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232                 bool isFull = false;
1233                 round->generateParts();
1234                 Vector<Entry *> *parts = round->getParts();
1235
1236                 // Insert pending arbitration data
1237                 uint vsize = parts->size();
1238                 for (uint vi = 0; vi < vsize; vi++) {
1239                         Entry *arbitrationData = parts->get(vi);
1240
1241                         // If it is an abort then we need to set some information
1242                         if (arbitrationData->getType() == TypeAbort) {
1243                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1244                         }
1245
1246                         if (!slot->hasSpace(arbitrationData)) {
1247                                 // No space so cant do anything else with these data entries
1248                                 isFull = true;
1249                                 break;
1250                         }
1251
1252                         // Add to this current slot and add it to entries to delete
1253                         slot->addEntry(arbitrationData);
1254                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1255                 }
1256
1257                 if (isFull) {
1258                         break;
1259                 }
1260         }
1261
1262         if (pendingTransactionQueue->size() > 0) {
1263                 Transaction *transaction = pendingTransactionQueue->get(0);
1264                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266                         transaction->setSequenceNumber(slot->getSequenceNumber());
1267                 }
1268
1269                 while (true) {
1270                         TransactionPart *part = transaction->getNextPartToSend();
1271                         if (part == NULL) {
1272                                 // Ran out of parts to send for this transaction so move on
1273                                 break;
1274                         }
1275
1276                         if (slot->hasSpace(part)) {
1277                                 slot->addEntry(part);
1278                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279                                 if (partsSent == NULL) {
1280                                         partsSent = new Vector<int32_t>();
1281                                         transactionPartsSent->put(transaction, partsSent);
1282                                 }
1283                                 partsSent->add(part->getPartNumber());
1284                                 transactionPartsSent->put(transaction, partsSent);
1285                         } else {
1286                                 break;
1287                         }
1288                 }
1289         }
1290
1291         // Fill the remainder of the slot with rescue data
1292         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1293
1294         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1295 }
1296
1297 void Table::doRejectedMessages(Slot *s) {
1298         if (!rejectedSlotVector->isEmpty()) {
1299                 /* TODO: We should avoid generating a rejected message entry if
1300                  * there is already a sufficient entry in the queue (e->g->,
1301                  * equalsto value of true and same sequence number)->  */
1302
1303                 int64_t old_seqn = rejectedSlotVector->get(0);
1304                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305                         int64_t new_seqn = rejectedSlotVector->lastElement();
1306                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1307                         s->addEntry(rm);
1308                 } else {
1309                         int64_t prev_seqn = -1;
1310                         int i = 0;
1311                         /* Go through list of missing messages */
1312                         for (; i < rejectedSlotVector->size(); i++) {
1313                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1314                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1315                                 if (s_msg != NULL)
1316                                         break;
1317                                 prev_seqn = curr_seqn;
1318                         }
1319                         /* Generate rejected message entry for missing messages */
1320                         if (prev_seqn != -1) {
1321                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1322                                 s->addEntry(rm);
1323                         }
1324                         /* Generate rejected message entries for present messages */
1325                         for (; i < rejectedSlotVector->size(); i++) {
1326                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1327                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1328                                 int64_t machineid = s_msg->getMachineID();
1329                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1330                                 s->addEntry(rm);
1331                         }
1332                 }
1333         }
1334 }
1335
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1341         }
1342
1343         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344         bool seenLiveSlot = false;
1345         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1346         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1347
1348
1349         // Mandatory Rescue
1350         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351                 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352                 // Push slot number forward
1353                 if (!seenLiveSlot) {
1354                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1355                 }
1356
1357                 if (!previousSlot->isLive()) {
1358                         continue;
1359                 }
1360
1361                 // We have seen a live slot
1362                 seenLiveSlot = true;
1363
1364                 // Get all the live entries for a slot
1365                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1366
1367                 // Iterate over all the live entries and try to rescue them
1368                 uint lESize = liveEntries->size();
1369                 for (uint i=0; i< lESize; i++) {
1370                         Entry * liveEntry = liveEntries->get(i);
1371                         if (slot->hasSpace(liveEntry)) {
1372                                 // Enough space to rescue the entry
1373                                 slot->addEntry(liveEntry);
1374                         } else if (currentSequenceNumber == firstIfFull) {
1375                                 //if there's no space but the entry is about to fall off the queue
1376                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1377                         }
1378                 }
1379         }
1380
1381         // Did not resize
1382         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1383 }
1384
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386         /* now go through live entries from least to greatest sequence number until
1387          * either all live slots added, or the slot doesn't have enough room
1388          * for SKIP_THRESHOLD consecutive entries*/
1389         int skipcount = 0;
1390         int64_t newestseqnum = buffer->getNewestSeqNum();
1391 search:
1392         for (; seqn <= newestseqnum; seqn++) {
1393                 Slot *prevslot = buffer->getSlot(seqn);
1394                 //Push slot number forward
1395                 if (!seenliveslot)
1396                         oldestLiveSlotSequenceNumver = seqn;
1397
1398                 if (!prevslot->isLive())
1399                         continue;
1400                 seenliveslot = true;
1401                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402                 uint lESize = liveentries->size();
1403                 for (uint i=0; i< lESize; i++) {
1404                         Entry * liveentry = liveentries->get(i);
1405                         if (s->hasSpace(liveentry))
1406                                 s->addEntry(liveentry);
1407                         else {
1408                                 skipcount++;
1409                                 if (skipcount > Table_SKIP_THRESHOLD)
1410                                         goto donesearch;
1411                         }
1412                 }
1413         }
1414  donesearch:
1415         ;
1416 }
1417
1418 /**
1419  * Checks for malicious activity and updates the local copy of the block chain->
1420  */
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422         // The cloud communication layer has checked slot HMACs already
1423         // before decoding
1424         if (newSlots->length() == 0) {
1425                 return;
1426         }
1427
1428         // Make sure all slots are newer than the last largest slot this
1429         // client has seen
1430         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431         if (firstSeqNum <= sequenceNumber) {
1432                 throw new Error("Server Error: Sent older slots!");
1433         }
1434
1435         // Create an object that can access both new slots and slots in our
1436         // local chain without committing slots to our local chain
1437         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1438
1439         // Check that the HMAC chain is not broken
1440         checkHMACChain(indexer, newSlots);
1441
1442         // Set to keep track of messages from clients
1443         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1444         {
1445                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446                 while(lmit->hasNext())
1447                         machineSet->add(lmit->next());
1448                 delete lmit;
1449         }
1450
1451         // Process each slots data
1452         {
1453                 uint numSlots = newSlots->length();
1454                 for(uint i=0; i<numSlots; i++) {
1455                         Slot *slot = newSlots->get(i);
1456                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457                         updateExpectedSize();
1458                 }
1459         }
1460
1461         // If there is a gap, check to see if the server sent us
1462         // everything->
1463         if (firstSeqNum != (sequenceNumber + 1)) {
1464
1465                 // Check the size of the slots that were sent down by the server->
1466                 // Can only check the size if there was a gap
1467                 checkNumSlots(newSlots->length());
1468
1469                 // Since there was a gap every machine must have pushed a slot or
1470                 // must have a last message message-> If not then the server is
1471                 // hiding slots
1472                 if (!machineSet->isEmpty()) {
1473                         throw new Error("Missing record for machines: ");
1474                 }
1475         }
1476
1477         // Update the size of our local block chain->
1478         commitNewMaxSize();
1479
1480         // Commit new to slots to the local block chain->
1481         {
1482                 uint numSlots = newSlots->length();
1483                 for(uint i=0; i<numSlots; i++) {
1484                         Slot *slot = newSlots->get(i);
1485                         
1486                         // Insert this slot into our local block chain copy->
1487                         buffer->putSlot(slot);
1488                         
1489                         // Keep track of how many slots are currently live (have live data
1490                         // in them)->
1491                         liveSlotCount++;
1492                 }
1493         }
1494         // Get the sequence number of the latest slot in the system
1495         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496         updateLiveStateFromServer();
1497
1498         // No Need to remember after we pulled from the server
1499         offlineTransactionsCommittedAndAtServer->clear();
1500
1501         // This is invalidated now
1502         hadPartialSendToServer = false;
1503 }
1504
1505 void Table::updateLiveStateFromServer() {
1506         // Process the new transaction parts
1507         processNewTransactionParts();
1508
1509         // Do arbitration on new transactions that were received
1510         arbitrateFromServer();
1511
1512         // Update all the committed keys
1513         bool didCommitOrSpeculate = updateCommittedTable();
1514
1515         // Delete the transactions that are now dead
1516         updateLiveTransactionsAndStatus();
1517
1518         // Do speculations
1519         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1521 }
1522
1523 void Table::updateLiveStateFromLocal() {
1524         // Update all the committed keys
1525         bool didCommitOrSpeculate = updateCommittedTable();
1526
1527         // Delete the transactions that are now dead
1528         updateLiveTransactionsAndStatus();
1529
1530         // Do speculations
1531         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1533 }
1534
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536         int64_t prevslots = firstSequenceNumber;
1537
1538         if (didFindTableStatus) {
1539         } else {
1540                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1541         }
1542
1543         didFindTableStatus = true;
1544         currMaxSize = numberOfSlots;
1545 }
1546
1547 void Table::updateExpectedSize() {
1548         expectedsize++;
1549
1550         if (expectedsize > currMaxSize) {
1551                 expectedsize = currMaxSize;
1552         }
1553 }
1554
1555
1556 /**
1557  * Check the size of the block chain to make sure there are enough
1558  * slots sent back by the server-> This is only called when we have a
1559  * gap between the slots that we have locally and the slots sent by
1560  * the server therefore in the slots sent by the server there will be
1561  * at least 1 Table status message
1562  */
1563 void Table::checkNumSlots(int numberOfSlots) {
1564         if (numberOfSlots != expectedsize) {
1565                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1566         }
1567 }
1568
1569 /**
1570  * Update the size of of the local buffer if it is needed->
1571  */
1572 void Table::commitNewMaxSize() {
1573         didFindTableStatus = false;
1574
1575         // Resize the local slot buffer
1576         if (numberOfSlots != currMaxSize) {
1577                 buffer->resize((int32_t)currMaxSize);
1578         }
1579
1580         // Change the number of local slots to the new size
1581         numberOfSlots = (int32_t)currMaxSize;
1582
1583         // Recalculate the resize threshold since the size of the local
1584         // buffer has changed
1585         setResizeThreshold();
1586 }
1587
1588 /**
1589  * Process the new transaction parts from this latest round of slots
1590  * received from the server
1591  */
1592 void Table::processNewTransactionParts() {
1593
1594         if (newTransactionParts->size() == 0) {
1595                 // Nothing new to process
1596                 return;
1597         }
1598
1599         // Iterate through all the machine Ids that we received new parts
1600         // for
1601         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602         while(tpit->hasNext()) {
1603                 int64_t machineId = tpit->next();
1604                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1605
1606                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607                 // Iterate through all the parts for that machine Id
1608                 while(ptit->hasNext()) {
1609                         Pair<int64_t, int32_t> * partId = ptit->next();
1610                         TransactionPart *part = parts->get(partId);
1611
1612                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615                                         // Set dead the transaction part
1616                                         part->setDead();
1617                                         continue;
1618                                 }
1619                         }
1620                         
1621                         // Get the transaction object for that sequence number
1622                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1623
1624                         if (transaction == NULL) {
1625                                 // This is a new transaction that we dont have so make a new one
1626                                 transaction = new Transaction();
1627
1628                                 // Insert this new transaction into the live tables
1629                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1631                         }
1632
1633                         // Add that part to the transaction
1634                         transaction->addPartDecode(part);
1635                 }
1636                 delete ptit;
1637         }
1638         delete tpit;
1639         // Clear all the new transaction parts in preparation for the next
1640         // time the server sends slots
1641         newTransactionParts->clear();
1642 }
1643
1644 void Table::arbitrateFromServer() {
1645
1646         if (liveTransactionBySequenceNumberTable->size() == 0) {
1647                 // Nothing to arbitrate on so move on
1648                 return;
1649         }
1650
1651         // Get the transaction sequence numbers and sort from oldest to newest
1652         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1653         {
1654                 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1655                 while(trit->hasNext())
1656                         transactionSequenceNumbers->add(trit->next());
1657                 delete trit;
1658         }
1659         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1660
1661         // Collection of key value pairs that are
1662         Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1663
1664         // The last transaction arbitrated on
1665         int64_t lastTransactionCommitted = -1;
1666         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1667         uint tsnSize = transactionSequenceNumbers->size();
1668         for(uint i=0; i<tsnSize; i++) {
1669                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1670                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1671
1672                 // Check if this machine arbitrates for this transaction if not
1673                 // then we cant arbitrate this transaction
1674                 if (transaction->getArbitrator() != localMachineId) {
1675                         continue;
1676                 }
1677
1678                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1679                         continue;
1680                 }
1681
1682                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1683                         // We have seen this already locally so dont commit again
1684                         continue;
1685                 }
1686
1687
1688                 if (!transaction->isComplete()) {
1689                         // Will arbitrate in incorrect order if we continue so just break
1690                         // Most likely this
1691                         break;
1692                 }
1693
1694
1695                 // update the largest transaction seen by arbitrator from server
1696                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1697                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1698                 } else {
1699                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1700                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1701                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1702                         }
1703                 }
1704
1705                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1706                         // Guard evaluated as true
1707
1708                         // Update the local changes so we can make the commit
1709                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1710                         while (kvit->hasNext()) {
1711                                 KeyValue *kv = kvit->next();
1712                                 speculativeTableTmp->put(kv->getKey(), kv);
1713                         }
1714                         delete kvit;
1715                         
1716                         // Update what the last transaction committed was for use in batch commit
1717                         lastTransactionCommitted = transactionSequenceNumber;
1718                 } else {
1719                         // Guard evaluated was false so create abort
1720                         // Create the abort
1721                         Abort *newAbort = new Abort(NULL,
1722                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1723                                                                                                                                         transaction->getSequenceNumber(),
1724                                                                                                                                         transaction->getMachineId(),
1725                                                                                                                                         transaction->getArbitrator(),
1726                                                                                                                                         localArbitrationSequenceNumber);
1727                         localArbitrationSequenceNumber++;
1728                         generatedAborts->add(newAbort);
1729
1730                         // Insert the abort so we can process
1731                         processEntry(newAbort);
1732                 }
1733
1734                 lastSeqNumArbOn = transactionSequenceNumber;
1735         }
1736
1737         Commit *newCommit = NULL;
1738
1739         // If there is something to commit
1740         if (speculativeTableTmp->size() != 0) {
1741                 // Create the commit and increment the commit sequence number
1742                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1743                 localArbitrationSequenceNumber++;
1744
1745                 // Add all the new keys to the commit
1746                 for (KeyValue *kv : speculativeTableTmp->values()) {
1747                         newCommit->addKV(kv);
1748                 }
1749
1750                 // create the commit parts
1751                 newCommit->createCommitParts();
1752
1753                 // Append all the commit parts to the end of the pending queue
1754                 // waiting for sending to the server
1755                 // Insert the commit so we can process it
1756                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1757                         processEntry(commitPart);
1758                 }
1759         }
1760
1761         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1762                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1763                 pendingSendArbitrationRounds->add(arbitrationRound);
1764
1765                 if (compactArbitrationData()) {
1766                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1767                         if (newArbitrationRound->getCommit() != NULL) {
1768                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1769                                         processEntry(commitPart);
1770                                 }
1771                         }
1772                 }
1773         }
1774 }
1775
1776 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1777
1778         // Check if this machine arbitrates for this transaction if not then
1779         // we cant arbitrate this transaction
1780         if (transaction->getArbitrator() != localMachineId) {
1781                 return Pair<bool, bool>(false, false);
1782         }
1783
1784         if (!transaction->isComplete()) {
1785                 // Will arbitrate in incorrect order if we continue so just break
1786                 // Most likely this
1787                 return Pair<bool, bool>(false, false);
1788         }
1789
1790         if (transaction->getMachineId() != localMachineId) {
1791                 // dont do this check for local transactions
1792                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1793                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1794                                 // We've have already seen this from the server
1795                                 return Pair<bool, bool>(false, false);
1796                         }
1797                 }
1798         }
1799
1800         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1801                 // Guard evaluated as true Create the commit and increment the
1802                 // commit sequence number
1803                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1804                 localArbitrationSequenceNumber++;
1805
1806                 // Update the local changes so we can make the commit
1807                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1808                 while (kvit->hasNext()) {
1809                         KeyValue *kv = kvit->next();
1810                         newCommit->addKV(kv);
1811                 }
1812                 delete kvit;
1813                 
1814                 // create the commit parts
1815                 newCommit->createCommitParts();
1816
1817                 // Append all the commit parts to the end of the pending queue
1818                 // waiting for sending to the server
1819                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1820                 pendingSendArbitrationRounds->add(arbitrationRound);
1821
1822                 if (compactArbitrationData()) {
1823                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1824                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1825                                 processEntry(commitPart);
1826                         }
1827                 } else {
1828                         // Insert the commit so we can process it
1829                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1830                                 processEntry(commitPart);
1831                         }
1832                 }
1833
1834                 if (transaction->getMachineId() == localMachineId) {
1835                         TransactionStatus *status = transaction->getTransactionStatus();
1836                         if (status != NULL) {
1837                                 status->setStatus(TransactionStatus_StatusCommitted);
1838                         }
1839                 }
1840
1841                 updateLiveStateFromLocal();
1842                 return Pair<bool, bool>(true, true);
1843         } else {
1844                 if (transaction->getMachineId() == localMachineId) {
1845                         // For locally created messages update the status
1846                         // Guard evaluated was false so create abort
1847                         TransactionStatus * status = transaction->getTransactionStatus();
1848                         if (status != NULL) {
1849                                 status->setStatus(TransactionStatus_StatusAborted);
1850                         }
1851                 } else {
1852                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1853
1854                         // Create the abort
1855                         Abort *newAbort = new Abort(NULL,
1856                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1857                                                                                                                                         -1,
1858                                                                                                                                         transaction->getMachineId(),
1859                                                                                                                                         transaction->getArbitrator(),
1860                                                                                                                                         localArbitrationSequenceNumber);
1861                         localArbitrationSequenceNumber++;
1862                         addAbortSet->add(newAbort);
1863
1864                         // Append all the commit parts to the end of the pending queue
1865                         // waiting for sending to the server
1866                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1867                         pendingSendArbitrationRounds->add(arbitrationRound);
1868
1869                         if (compactArbitrationData()) {
1870                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1871                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1872                                         processEntry(commitPart);
1873                                 }
1874                         }
1875                 }
1876
1877                 updateLiveStateFromLocal();
1878                 return Pair<bool, bool>(true, false);
1879         }
1880 }
1881
1882 /**
1883  * Compacts the arbitration data my merging commits and aggregating
1884  * aborts so that a single large push of commits can be done instead
1885  * of many small updates
1886  */
1887 bool Table::compactArbitrationData() {
1888         if (pendingSendArbitrationRounds->size() < 2) {
1889                 // Nothing to compact so do nothing
1890                 return false;
1891         }
1892
1893         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1894         if (lastRound->getDidSendPart()) {
1895                 return false;
1896         }
1897
1898         bool hadCommit = (lastRound->getCommit() == NULL);
1899         bool gotNewCommit = false;
1900
1901         int numberToDelete = 1;
1902         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1903                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1904
1905                 if (round->isFull() || round->getDidSendPart()) {
1906                         // Stop since there is a part that cannot be compacted and we
1907                         // need to compact in order
1908                         break;
1909                 }
1910
1911                 if (round->getCommit() == NULL) {
1912                         // Try compacting aborts only
1913                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1914                         if (newSize > ArbitrationRound_MAX_PARTS) {
1915                                 // Cant compact since it would be too large
1916                                 break;
1917                         }
1918                         lastRound->addAborts(round->getAborts());
1919                 } else {
1920                         // Create a new larger commit
1921                         Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1922                         localArbitrationSequenceNumber++;
1923
1924                         // Create the commit parts so that we can count them
1925                         newCommit->createCommitParts();
1926
1927                         // Calculate the new size of the parts
1928                         int newSize = newCommit->getNumberOfParts();
1929                         newSize += lastRound->getAbortsCount();
1930                         newSize += round->getAbortsCount();
1931
1932                         if (newSize > ArbitrationRound_MAX_PARTS) {
1933                                 // Cant compact since it would be too large
1934                                 break;
1935                         }
1936
1937                         // Set the new compacted part
1938                         lastRound->setCommit(newCommit);
1939                         lastRound->addAborts(round->getAborts());
1940                         gotNewCommit = true;
1941                 }
1942
1943                 numberToDelete++;
1944         }
1945
1946         if (numberToDelete != 1) {
1947                 // If there is a compaction
1948                 // Delete the previous pieces that are now in the new compacted piece
1949                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1950                         pendingSendArbitrationRounds->clear();
1951                 } else {
1952                         for (int i = 0; i < numberToDelete; i++) {
1953                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1954                         }
1955                 }
1956
1957                 // Add the new compacted into the pending to send list
1958                 pendingSendArbitrationRounds->add(lastRound);
1959
1960                 // Should reinsert into the commit processor
1961                 if (hadCommit && gotNewCommit) {
1962                         return true;
1963                 }
1964         }
1965
1966         return false;
1967 }
1968
1969 /**
1970  * Update all the commits and the committed tables, sets dead the dead
1971  * transactions
1972  */
1973 bool Table::updateCommittedTable() {
1974
1975         if (newCommitParts->size() == 0) {
1976                 // Nothing new to process
1977                 return false;
1978         }
1979
1980         // Iterate through all the machine Ids that we received new parts for
1981         for (int64_t machineId : newCommitParts->keySet()) {
1982                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1983
1984                 // Iterate through all the parts for that machine Id
1985                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1986                         CommitPart *part = parts->get(partId);
1987
1988                         // Get the transaction object for that sequence number
1989                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1990
1991                         if (commitForClientTable == NULL) {
1992                                 // This is the first commit from this device
1993                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1994                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1995                         }
1996
1997                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1998
1999                         if (commit == NULL) {
2000                                 // This is a new commit that we dont have so make a new one
2001                                 commit = new Commit();
2002
2003                                 // Insert this new commit into the live tables
2004                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2005                         }
2006
2007                         // Add that part to the commit
2008                         commit->addPartDecode(part);
2009                 }
2010         }
2011
2012         // Clear all the new commits parts in preparation for the next time
2013         // the server sends slots
2014         newCommitParts->clear();
2015
2016         // If we process a new commit keep track of it for future use
2017         bool didProcessANewCommit = false;
2018
2019         // Process the commits one by one
2020         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
2021
2022                 // Get all the commits for a specific arbitrator
2023                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2024
2025                 // Sort the commits in order
2026                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
2027                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2028
2029                 // Get the last commit seen from this arbitrator
2030                 int64_t lastCommitSeenSequenceNumber = -1;
2031                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
2032                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2033                 }
2034
2035                 // Go through each new commit one by one
2036                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2037                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2038                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2039
2040                         // Special processing if a commit is not complete
2041                         if (!commit->isComplete()) {
2042                                 if (i == (commitSequenceNumbers->size() - 1)) {
2043                                         // If there is an incomplete commit and this commit is the
2044                                         // latest one seen then this commit cannot be processed and
2045                                         // there are no other commits
2046                                         break;
2047                                 } else {
2048                                         // This is a commit that was already dead but parts of it
2049                                         // are still in the block chain (not flushed out yet)->
2050                                         // Delete it and move on
2051                                         commit->setDead();
2052                                         commitForClientTable->remove(commit->getSequenceNumber());
2053                                         continue;
2054                                 }
2055                         }
2056
2057                         // Update the last transaction that was updated if we can
2058                         if (commit->getTransactionSequenceNumber() != -1) {
2059                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2060
2061                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2062                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2063                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2064                                 }
2065                         }
2066
2067                         // Update the last arbitration data that we have seen so far
2068                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2069                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2070                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2071                                         // Is larger
2072                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2073                                 }
2074                         } else {
2075                                 // Never seen any data from this arbitrator so record the first one
2076                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2077                         }
2078
2079                         // We have already seen this commit before so need to do the
2080                         // full processing on this commit
2081                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2082
2083                                 // Update the last transaction that was updated if we can
2084                                 if (commit->getTransactionSequenceNumber() != -1) {
2085                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2086
2087                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2088                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2089                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2090                                         }
2091                                 }
2092
2093                                 continue;
2094                         }
2095
2096                         // If we got here then this is a brand new commit and needs full
2097                         // processing
2098                         // Get what commits should be edited, these are the commits that
2099                         // have live values for their keys
2100                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2101                         {
2102                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2103                                 while (kvit->hasNext()) {
2104                                         KeyValue *kv = kvit->next();
2105                                         commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2106                                 }
2107                                 delete kvit;
2108                         }
2109                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2110
2111                         // Update each previous commit that needs to be updated
2112                         for (Commit *previousCommit : commitsToEdit) {
2113
2114                                 // Only bother with live commits (TODO: Maybe remove this check)
2115                                 if (previousCommit->isLive()) {
2116
2117                                         // Update which keys in the old commits are still live
2118                                         {
2119                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2120                                                 while (kvit->hasNext()) {
2121                                                         KeyValue *kv = kvit->next();
2122                                                         previousCommit->invalidateKey(kv->getKey());
2123                                                 }
2124                                                 delete kvit;
2125                                         }
2126                                         
2127                                         // if the commit is now dead then remove it
2128                                         if (!previousCommit->isLive()) {
2129                                                 commitForClientTable->remove(previousCommit);
2130                                         }
2131                                 }
2132                         }
2133
2134                         // Update the last seen sequence number from this arbitrator
2135                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2136                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2137                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2138                                 }
2139                         } else {
2140                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2141                         }
2142
2143                         // We processed a new commit that we havent seen before
2144                         didProcessANewCommit = true;
2145
2146                         // Update the committed table of keys and which commit is using which key
2147                         {
2148                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2149                                 while (kvit->hasNext()) {
2150                                         KeyValue *kv = kvit->next();
2151                                         committedKeyValueTable->put(kv->getKey(), kv);
2152                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2153                                 }
2154                                 delete kvit;
2155                         }
2156                 }
2157         }
2158
2159         return didProcessANewCommit;
2160 }
2161
2162 /**
2163  * Create the speculative table from transactions that are still live
2164  * and have come from the cloud
2165  */
2166 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2167         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2168                 // There is nothing to speculate on
2169                 return false;
2170         }
2171
2172         // Create a list of the transaction sequence numbers and sort them
2173         // from oldest to newest
2174         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2175         qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2176
2177         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2178
2179
2180         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2181                 // If there is a gap in the transaction sequence numbers then
2182                 // there was a commit or an abort of a transaction OR there was a
2183                 // new commit (Could be from offline commit) so a redo the
2184                 // speculation from scratch
2185
2186                 // Start from scratch
2187                 speculatedKeyValueTable->clear();
2188                 lastTransactionSequenceNumberSpeculatedOn = -1;
2189                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2190
2191         }
2192
2193         // Remember the front of the transaction list
2194         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2195
2196         // Find where to start arbitration from
2197         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2198
2199         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2200                 // Make sure we are not out of bounds
2201                 return false;           // did not speculate
2202         }
2203
2204         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2205         bool didSkip = true;
2206
2207         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2208                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2209                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2210
2211                 if (!transaction->isComplete()) {
2212                         // If there is an incomplete transaction then there is nothing
2213                         // we can do add this transactions arbitrator to the list of
2214                         // arbitrators we should ignore
2215                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2216                         didSkip = true;
2217                         continue;
2218                 }
2219
2220                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2221                         continue;
2222                 }
2223
2224                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2225
2226                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2227                         // Guard evaluated to true so update the speculative table
2228                         {
2229                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2230                                 while (kvit->hasNext()) {
2231                                         KeyValue *kv = kvit->next();
2232                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2233                                 }
2234                                 delete kvit;
2235                         }
2236                 }
2237         }
2238
2239         if (didSkip) {
2240                 // Since there was a skip we need to redo the speculation next time around
2241                 lastTransactionSequenceNumberSpeculatedOn = -1;
2242                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2243         }
2244
2245         // We did some speculation
2246         return true;
2247 }
2248
2249 /**
2250  * Create the pending transaction speculative table from transactions
2251  * that are still in the pending transaction buffer
2252  */
2253 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2254         if (pendingTransactionQueue->size() == 0) {
2255                 // There is nothing to speculate on
2256                 return;
2257         }
2258
2259         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2260                 // need to reset on the pending speculation
2261                 lastPendingTransactionSpeculatedOn = NULL;
2262                 firstPendingTransaction = pendingTransactionQueue->get(0);
2263                 pendingTransactionSpeculatedKeyValueTable->clear();
2264         }
2265
2266         // Find where to start arbitration from
2267         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2268
2269         if (startIndex >= pendingTransactionQueue->size()) {
2270                 // Make sure we are not out of bounds
2271                 return;
2272         }
2273
2274         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2275                 Transaction *transaction = pendingTransactionQueue->get(i);
2276
2277                 lastPendingTransactionSpeculatedOn = transaction;
2278
2279                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2280                         // Guard evaluated to true so update the speculative table
2281                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2282                         while (kvit->hasNext()) {
2283                                 KeyValue *kv = kvit->next();
2284                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2285                         }
2286                         delete kvit;
2287                 }
2288         }
2289 }
2290
2291 /**
2292  * Set dead and remove from the live transaction tables the
2293  * transactions that are dead
2294  */
2295 void Table::updateLiveTransactionsAndStatus() {
2296
2297         // Go through each of the transactions
2298         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2299                 Transaction *transaction = iter->next()->getValue();
2300
2301                 // Check if the transaction is dead
2302                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2303                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2304
2305                         // Set dead the transaction
2306                         transaction->setDead();
2307
2308                         // Remove the transaction from the live table
2309                         iter->remove();
2310                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2311                 }
2312         }
2313
2314         // Go through each of the transactions
2315         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2316                 TransactionStatus *status = iter->next()->getValue();
2317
2318                 // Check if the transaction is dead
2319                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2320                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2321
2322                         // Set committed
2323                         status->setStatus(TransactionStatus_StatusCommitted);
2324
2325                         // Remove
2326                         iter->remove();
2327                 }
2328         }
2329 }
2330
2331 /**
2332  * Process this slot, entry by entry->  Also update the latest message sent by slot
2333  */
2334 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2335
2336         // Update the last message seen
2337         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2338
2339         // Process each entry in the slot
2340         Vector<Entry *> *entries = slot->getEntries();
2341         uint eSize = entries->size();
2342         for(uint ei=0; ei < eSize; ei++) {
2343                 Entry * entry = entries->get(ei);
2344                 switch (entry->getType()) {
2345                 case TypeCommitPart:
2346                         processEntry((CommitPart *)entry);
2347                         break;
2348                 case TypeAbort:
2349                         processEntry((Abort *)entry);
2350                         break;
2351                 case TypeTransactionPart:
2352                         processEntry((TransactionPart *)entry);
2353                         break;
2354                 case TypeNewKey:
2355                         processEntry((NewKey *)entry);
2356                         break;
2357                 case TypeLastMessage:
2358                         processEntry((LastMessage *)entry, machineSet);
2359                         break;
2360                 case TypeRejectedMessage:
2361                         processEntry((RejectedMessage *)entry, indexer);
2362                         break;
2363                 case TypeTableStatus:
2364                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2365                         break;
2366                 default:
2367                         throw new Error("Unrecognized type: ");
2368                 }
2369         }
2370 }
2371
2372 /**
2373  * Update the last message that was sent for a machine Id
2374  */
2375 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2376         // Update what the last message received by a machine was
2377         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2378 }
2379
2380 /**
2381  * Add the new key to the arbitrators table and update the set of live
2382  * new keys (in case of a rescued new key message)
2383  */
2384 void Table::processEntry(NewKey *entry) {
2385         // Update the arbitrator table with the new key information
2386         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2387
2388         // Update what the latest live new key is
2389         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2390         if (oldNewKey != NULL) {
2391                 // Delete the old new key messages
2392                 oldNewKey->setDead();
2393         }
2394 }
2395
2396 /**
2397  * Process new table status entries and set dead the old ones as new
2398  * ones come in-> keeps track of the largest and smallest table status
2399  * seen in this current round of updating the local copy of the block
2400  * chain
2401  */
2402 void Table::processEntry(TableStatus * entry, int64_t seq) {
2403         int newNumSlots = entry->getMaxSlots();
2404         updateCurrMaxSize(newNumSlots);
2405         initExpectedSize(seq, newNumSlots);
2406
2407         if (liveTableStatus != NULL) {
2408                 // We have a larger table status so the old table status is no
2409                 // int64_ter alive
2410                 liveTableStatus->setDead();
2411         }
2412
2413         // Make this new table status the latest alive table status
2414         liveTableStatus = entry;
2415 }
2416
2417 /**
2418  * Check old messages to see if there is a block chain violation->
2419  * Also
2420  */
2421 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2422         int64_t oldSeqNum = entry->getOldSeqNum();
2423         int64_t newSeqNum = entry->getNewSeqNum();
2424         bool isequal = entry->getEqual();
2425         int64_t machineId = entry->getMachineID();
2426         int64_t seq = entry->getSequenceNumber();
2427
2428         // Check if we have messages that were supposed to be rejected in
2429         // our local block chain
2430         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2431                 // Get the slot
2432                 Slot *slot = indexer->getSlot(seqNum);
2433
2434                 if (slot != NULL) {
2435                         // If we have this slot make sure that it was not supposed to be
2436                         // a rejected slot
2437                         int64_t slotMachineId = slot->getMachineID();
2438                         if (isequal != (slotMachineId == machineId)) {
2439                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2440                         }
2441                 }
2442         }
2443
2444         // Create a list of clients to watch until they see this rejected
2445         // message entry->
2446         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2447         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2448                 // Machine ID for the last message entry
2449                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2450
2451                 // We've seen it, don't need to continue to watch->  Our next
2452                 // message will implicitly acknowledge it->
2453                 if (lastMessageEntryMachineId == localMachineId) {
2454                         continue;
2455                 }
2456
2457                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2458                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2459
2460                 if (entrySequenceNumber < seq) {
2461                         // Add this rejected message to the set of messages that this
2462                         // machine ID did not see yet
2463                         addWatchVector(lastMessageEntryMachineId, entry);
2464                         // This client did not see this rejected message yet so add it
2465                         // to the watch set to monitor
2466                         deviceWatchSet->add(lastMessageEntryMachineId);
2467                 }
2468         }
2469         if (deviceWatchSet->isEmpty()) {
2470                 // This rejected message has been seen by all the clients so
2471                 entry->setDead();
2472         } else {
2473                 // We need to watch this rejected message
2474                 entry->setWatchSet(deviceWatchSet);
2475         }
2476 }
2477
2478 /**
2479  * Check if this abort is live, if not then save it so we can kill it
2480  * later-> update the last transaction number that was arbitrated on->
2481  */
2482 void Table::processEntry(Abort *entry) {
2483         if (entry->getTransactionSequenceNumber() != -1) {
2484                 // update the transaction status if it was sent to the server
2485                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2486                 if (status != NULL) {
2487                         status->setStatus(TransactionStatus_StatusAborted);
2488                 }
2489         }
2490
2491         // Abort has not been seen by the client it is for yet so we need to
2492         // keep track of it
2493         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2494         if (previouslySeenAbort != NULL) {
2495                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2496         }
2497
2498         if (entry->getTransactionArbitrator() == localMachineId) {
2499                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2500         }
2501
2502         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2503                 // The machine already saw this so it is dead
2504                 entry->setDead();
2505                 liveAbortTable->remove(&entry->getAbortId());
2506
2507                 if (entry->getTransactionArbitrator() == localMachineId) {
2508                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2509                 }
2510                 return;
2511         }
2512
2513         // Update the last arbitration data that we have seen so far
2514         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2515                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2516                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2517                         // Is larger
2518                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2519                 }
2520         } else {
2521                 // Never seen any data from this arbitrator so record the first one
2522                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2523         }
2524
2525         // Set dead a transaction if we can
2526         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2527         if (transactionToSetDead != NULL) {
2528                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2529         }
2530
2531         // Update the last transaction sequence number that the arbitrator
2532         // arbitrated on
2533         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2534         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2535                 // Is a valid one
2536                 if (entry->getTransactionSequenceNumber() != -1) {
2537                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2538                 }
2539         }
2540 }
2541
2542 /**
2543  * Set dead the transaction part if that transaction is dead and keep
2544  * track of all new parts
2545  */
2546 void Table::processEntry(TransactionPart *entry) {
2547         // Check if we have already seen this transaction and set it dead OR
2548         // if it is not alive
2549         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2550         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2551                 // This transaction is dead, it was already committed or aborted
2552                 entry->setDead();
2553                 return;
2554         }
2555
2556         // This part is still alive
2557         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2558
2559         if (transactionPart == NULL) {
2560                 // Dont have a table for this machine Id yet so make one
2561                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2562                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2563         }
2564
2565         // Update the part and set dead ones we have already seen (got a
2566         // rescued version)
2567         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2568         if (previouslySeenPart != NULL) {
2569                 previouslySeenPart->setDead();
2570         }
2571 }
2572
2573 /**
2574  * Process new commit entries and save them for future use->  Delete duplicates
2575  */
2576 void Table::processEntry(CommitPart *entry) {
2577         // Update the last transaction that was updated if we can
2578         if (entry->getTransactionSequenceNumber() != -1) {
2579                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2580                 // Update the last transaction sequence number that the arbitrator
2581                 // arbitrated on
2582                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2583                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2584                 }
2585         }
2586
2587         Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2588         if (commitPart == NULL) {
2589                 // Don't have a table for this machine Id yet so make one
2590                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2591                 newCommitParts->put(entry->getMachineId(), commitPart);
2592         }
2593         // Update the part and set dead ones we have already seen (got a
2594         // rescued version)
2595         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2596         if (previouslySeenPart != NULL) {
2597                 previouslySeenPart->setDead();
2598         }
2599 }
2600
2601 /**
2602  * Update the last message seen table-> Update and set dead the
2603  * appropriate RejectedMessages as clients see them-> Updates the live
2604  * aborts, removes those that are dead and sets them dead-> Check that
2605  * the last message seen is correct and that there is no mismatch of
2606  * our own last message or that other clients have not had a rollback
2607  * on the last message->
2608  */
2609 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2610         // We have seen this machine ID
2611         machineSet->remove(machineId);
2612
2613         // Get the set of rejected messages that this machine Id is has not seen yet
2614         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2615         // If there is a rejected message that this machine Id has not seen yet
2616         if (watchset != NULL) {
2617                 // Go through each rejected message that this machine Id has not
2618                 // seen yet
2619
2620                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2621                 while(rmit->hasNext()) {
2622                         RejectedMessage *rm = rmit->next();
2623                         // If this machine Id has seen this rejected message->->->
2624                         if (rm->getSequenceNumber() <= seqNum) {
2625                                 // Remove it from our watchlist
2626                                 rmit->remove();
2627                                 // Decrement machines that need to see this notification
2628                                 rm->removeWatcher(machineId);
2629                         }
2630                 }
2631                 delete rmit;
2632         }
2633
2634         // Set dead the abort
2635         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2636                 Abort *abort = i->next()->getValue();
2637                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2638                         abort->setDead();
2639                         i->remove();
2640                         if (abort->getTransactionArbitrator() == localMachineId) {
2641                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2642                         }
2643                 }
2644         }
2645         if (machineId == localMachineId) {
2646                 // Our own messages are immediately dead->
2647                 char livenessType = liveness->getType();
2648                 if (livenessType==TypeLastMessage) {
2649                         ((LastMessage *)liveness)->setDead();
2650                 } else if (livenessType == TypeSlot) {
2651                         ((Slot *)liveness)->setDead();
2652                 } else {
2653                         throw new Error("Unrecognized type");
2654                 }
2655         }
2656         // Get the old last message for this device
2657         Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2658         if (lastMessageEntry == NULL) {
2659                 // If no last message then there is nothing else to process
2660                 return;
2661         }
2662
2663         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2664         Liveness *lastEntry = lastMessageEntry->getSecond();
2665         delete lastMessageEntry;
2666         
2667         // If it is not our machine Id since we already set ours to dead
2668         if (machineId != localMachineId) {
2669                 char lastEntryType = lastEntry->getType();
2670                 
2671                 if (lastEntryType == TypeLastMessage) {
2672                         ((LastMessage *)lastEntry)->setDead();
2673                 } else if (lastEntryType == TypeSlot) {
2674                         ((Slot *)lastEntry)->setDead();
2675                 } else {
2676                         throw new Error("Unrecognized type");
2677                 }
2678         }
2679         // Make sure the server is not playing any games
2680         if (machineId == localMachineId) {
2681                 if (hadPartialSendToServer) {
2682                         // We were not making any updates and we had a machine mismatch
2683                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2684                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2685                         }
2686                 } else {
2687                         // We were not making any updates and we had a machine mismatch
2688                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2689                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2690                         }
2691                 }
2692         } else {
2693                 if (lastMessageSeqNum > seqNum) {
2694                         throw new Error("Server Error: Rollback on remote machine sequence number");
2695                 }
2696         }
2697 }
2698
2699 /**
2700  * Add a rejected message entry to the watch set to keep track of
2701  * which clients have seen that rejected message entry and which have
2702  * not.
2703  */
2704 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2705         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2706         if (entries == NULL) {
2707                 // There is no set for this machine ID yet so create one
2708                 entries = new Hashset<RejectedMessage *>();
2709                 rejectedMessageWatchVectorTable->put(machineId, entries);
2710         }
2711         entries->add(entry);
2712 }
2713
2714 /**
2715  * Check if the HMAC chain is not violated
2716  */
2717 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2718         for (int i = 0; i < newSlots->length(); i++) {
2719                 Slot *currSlot = newSlots->get(i);
2720                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2721                 if (prevSlot != NULL &&
2722                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2723                         throw new Error("Server Error: Invalid HMAC Chain");
2724         }
2725 }