edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void * a, const void *b) {
25         const int64_t * pa = (const int64_t *) a;
26         const int64_t * pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localTransactionSequenceNumber(0),
50         lastTransactionSequenceNumberSpeculatedOn(0),
51         oldestTransactionSequenceNumberSpeculatedOn(0),
52         localArbitrationSequenceNumber(0),
53         hadPartialSendToServer(false),
54         attemptedToSendToServer(false),
55         expectedsize(0),
56         didFindTableStatus(false),
57         currMaxSize(0),
58         lastSlotAttemptedToSend(NULL),
59         lastIsNewKey(false),
60         lastNewSize(0),
61         lastTransactionPartsSent(NULL),
62         lastPendingSendArbitrationEntriesToDelete(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localTransactionSequenceNumber(0),
112         lastTransactionSequenceNumberSpeculatedOn(0),
113         oldestTransactionSequenceNumberSpeculatedOn(0),
114         localArbitrationSequenceNumber(0),
115         hadPartialSendToServer(false),
116         attemptedToSendToServer(false),
117         expectedsize(0),
118         didFindTableStatus(false),
119         currMaxSize(0),
120         lastSlotAttemptedToSend(NULL),
121         lastIsNewKey(false),
122         lastNewSize(0),
123         lastTransactionPartsSent(NULL),
124         lastPendingSendArbitrationEntriesToDelete(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 /**
160  * Init all the stuff needed for for table usage
161  */
162 void Table::init() {
163         // Init helper objects
164         random = new Random();
165         buffer = new SlotBuffer();
166
167         // init data structs
168         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174         arbitratorTable = new Hashtable<IoTString *, int64_t>();
175         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184         rejectedSlotVector = new Vector<int64_t>();
185         pendingTransactionQueue = new Vector<Transaction *>();
186         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
195
196         // Other init stuff
197         numberOfSlots = buffer->capacity();
198         setResizeThreshold();
199 }
200
201 /**
202  * Initialize the table by inserting a table status as the first entry
203  * into the table status also initialize the crypto stuff.
204  */
205 void Table::initTable() {
206         cloud->initSecurity();
207
208         // Create the first insertion into the block chain which is the table status
209         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210         localSequenceNumber++;
211         TableStatus *status = new TableStatus(s, numberOfSlots);
212         s->addEntry(status);
213         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
214
215         if (array == NULL) {
216                 array = new Array<Slot *>(1);
217                 array->set(0, s);
218                 // update local block chain
219                 validateAndUpdate(array, true);
220         } else if (array->length() == 1) {
221                 // in case we did push the slot BUT we failed to init it
222                 validateAndUpdate(array, true);
223         } else {
224                 throw new Error("Error on initialization");
225         }
226 }
227
228 /**
229  * Rebuild the table from scratch by pulling the latest block chain
230  * from the server.
231  */
232 void Table::rebuild() {
233         // Just pull the latest slots from the server
234         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235         validateAndUpdate(newslots, true);
236         sendToServer(NULL);
237         updateLiveTransactionsAndStatus();
238 }
239
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
242 }
243
244 int64_t Table::getArbitrator(IoTString *key) {
245         return arbitratorTable->get(key);
246 }
247
248 void Table::close() {
249         cloud->close();
250 }
251
252 IoTString *Table::getCommitted(IoTString *key)  {
253         KeyValue *kv = committedKeyValueTable->get(key);
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getSpeculative(IoTString *key) {
263         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
264
265         if (kv == NULL) {
266                 kv = speculatedKeyValueTable->get(key);
267         }
268
269         if (kv == NULL) {
270                 kv = committedKeyValueTable->get(key);
271         }
272
273         if (kv != NULL) {
274                 return kv->getValue();
275         } else {
276                 return NULL;
277         }
278 }
279
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281         KeyValue *kv = committedKeyValueTable->get(key);
282
283         if (!arbitratorTable->contains(key)) {
284                 throw new Error("Key not Found.");
285         }
286
287         // Make sure new key value pair matches the current arbitrator
288         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289                 // TODO: Maybe not throw en error
290                 throw new Error("Not all Key Values Match Arbitrator.");
291         }
292
293         if (kv != NULL) {
294                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295                 return kv->getValue();
296         } else {
297                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
298                 return NULL;
299         }
300 }
301
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303         if (!arbitratorTable->contains(key)) {
304                 throw new Error("Key not Found.");
305         }
306
307         // Make sure new key value pair matches the current arbitrator
308         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309                 // TODO: Maybe not throw en error
310                 throw new Error("Not all Key Values Match Arbitrator.");
311         }
312
313         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
314
315         if (kv == NULL) {
316                 kv = speculatedKeyValueTable->get(key);
317         }
318
319         if (kv == NULL) {
320                 kv = committedKeyValueTable->get(key);
321         }
322
323         if (kv != NULL) {
324                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325                 return kv->getValue();
326         } else {
327                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
328                 return NULL;
329         }
330 }
331
332 bool Table::update()  {
333         try {
334                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335                 validateAndUpdate(newSlots, false);
336                 sendToServer(NULL);
337                 updateLiveTransactionsAndStatus();
338                 return true;
339         } catch (Exception *e) {
340                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341                 while (kit->hasNext()) {
342                         int64_t m = kit->next();
343                         updateFromLocal(m);
344                 }
345                 delete kit;
346         }
347
348         return false;
349 }
350
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
352         while (true) {
353                 if (!arbitratorTable->contains(keyName)) {
354                         // There is already an arbitrator
355                         return false;
356                 }
357                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
358
359                 if (sendToServer(newKey)) {
360                         // If successfully inserted
361                         return true;
362                 }
363         }
364 }
365
366 void Table::startTransaction() {
367         // Create a new transaction, invalidates any old pending transactions.
368         pendingTransactionBuilder = new PendingTransaction(localMachineId);
369 }
370
371 void Table::addKV(IoTString *key, IoTString *value) {
372
373         // Make sure it is a valid key
374         if (!arbitratorTable->contains(key)) {
375                 throw new Error("Key not Found.");
376         }
377
378         // Make sure new key value pair matches the current arbitrator
379         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380                 // TODO: Maybe not throw en error
381                 throw new Error("Not all Key Values Match Arbitrator.");
382         }
383
384         // Add the key value to this transaction
385         KeyValue *kv = new KeyValue(key, value);
386         pendingTransactionBuilder->addKV(kv);
387 }
388
389 TransactionStatus *Table::commitTransaction() {
390         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391                 // transaction with no updates will have no effect on the system
392                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
393         }
394
395         // Set the local transaction sequence number and increment
396         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397         localTransactionSequenceNumber++;
398
399         // Create the transaction status
400         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
401
402         // Create the new transaction
403         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404         newTransaction->setTransactionStatus(transactionStatus);
405
406         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407                 // Add it to the queue and invalidate the builder for safety
408                 pendingTransactionQueue->add(newTransaction);
409         } else {
410                 arbitrateOnLocalTransaction(newTransaction);
411                 updateLiveStateFromLocal();
412         }
413
414         pendingTransactionBuilder = new PendingTransaction(localMachineId);
415
416         try {
417                 sendToServer(NULL);
418         } catch (ServerException *e) {
419
420                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421                 uint size = pendingTransactionQueue->size();
422                 uint oldindex = 0;
423                 for (int iter = 0; iter < size; iter++) {
424                         Transaction *transaction = pendingTransactionQueue->get(iter);
425                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
426
427                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428                                 // Already contacted this client so ignore all attempts to contact this client
429                                 // to preserve ordering for arbitrator
430                                 continue;
431                         }
432
433                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
434
435                         if (sendReturn.getFirst()) {
436                                 // Failed to contact over local
437                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
438                         } else {
439                                 // Successful contact or should not contact
440
441                                 if (sendReturn.getSecond()) {
442                                         // did arbitrate
443                                         oldindex--;
444                                 }
445                         }
446                 }
447                 pendingTransactionQueue->setSize(oldindex);
448         }
449
450         updateLiveStateFromLocal();
451
452         return transactionStatus;
453 }
454
455 /**
456  * Recalculate the new resize threshold
457  */
458 void Table::setResizeThreshold() {
459         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
461 }
462
463 int64_t Table::getLocalSequenceNumber() {
464         return localSequenceNumber;
465 }
466
467 bool Table::sendToServer(NewKey *newKey) {
468         bool fromRetry = false;
469         try {
470                 if (hadPartialSendToServer) {
471                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472                         if (newSlots->length() == 0) {
473                                 fromRetry = true;
474                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
475
476                                 if (sendSlotsReturn.getFirst()) {
477                                         if (newKey != NULL) {
478                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
479                                                         newKey = NULL;
480                                                 }
481                                         }
482
483                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484                                         while (trit->hasNext()) {
485                                                 Transaction *transaction = trit->next();
486                                                 transaction->resetServerFailure();
487                                                 // Update which transactions parts still need to be sent
488                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489                                                 // Add the transaction status to the outstanding list
490                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
491
492                                                 // Update the transaction status
493                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
494
495                                                 // Check if all the transaction parts were successfully
496                                                 // sent and if so then remove it from pending
497                                                 if (transaction->didSendAllParts()) {
498                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499                                                         pendingTransactionQueue->remove(transaction);
500                                                 }
501                                         }
502                                         delete trit;
503                                 } else {
504                                         newSlots = sendSlotsReturn.getThird();
505                                         bool isInserted = false;
506                                         for (uint si = 0; si < newSlots->length(); si++) {
507                                                 Slot *s = newSlots->get(si);
508                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
509                                                         isInserted = true;
510                                                         break;
511                                                 }
512                                         }
513
514                                         for (uint si = 0; si < newSlots->length(); si++) {
515                                                 Slot *s = newSlots->get(si);
516                                                 if (isInserted) {
517                                                         break;
518                                                 }
519
520                                                 // Process each entry in the slot
521                                                 Vector<Entry *> *ventries = s->getEntries();
522                                                 uint vesize = ventries->size();
523                                                 for (uint vei = 0; vei < vesize; vei++) {
524                                                         Entry *entry = ventries->get(vei);
525                                                         if (entry->getType() == TypeLastMessage) {
526                                                                 LastMessage *lastMessage = (LastMessage *)entry;
527                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
528                                                                         isInserted = true;
529                                                                         break;
530                                                                 }
531                                                         }
532                                                 }
533                                         }
534
535                                         if (isInserted) {
536                                                 if (newKey != NULL) {
537                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
538                                                                 newKey = NULL;
539                                                         }
540                                                 }
541
542                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543                                                 while (trit->hasNext()) {
544                                                         Transaction *transaction = trit->next();
545                                                         transaction->resetServerFailure();
546
547                                                         // Update which transactions parts still need to be sent
548                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
549
550                                                         // Add the transaction status to the outstanding list
551                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
552
553                                                         // Update the transaction status
554                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
555
556                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
557                                                         if (transaction->didSendAllParts()) {
558                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559                                                                 pendingTransactionQueue->remove(transaction);
560                                                         } else {
561                                                                 transaction->resetServerFailure();
562                                                                 // Set the transaction sequence number back to nothing
563                                                                 if (!transaction->didSendAPartToServer()) {
564                                                                         transaction->setSequenceNumber(-1);
565                                                                 }
566                                                         }
567                                                 }
568                                                 delete trit;
569                                         }
570                                 }
571
572                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573                                 while (trit->hasNext()) {
574                                         Transaction *transaction = trit->next();
575                                         transaction->resetServerFailure();
576                                         // Set the transaction sequence number back to nothing
577                                         if (!transaction->didSendAPartToServer()) {
578                                                 transaction->setSequenceNumber(-1);
579                                         }
580                                 }
581                                 delete trit;
582
583                                 if (sendSlotsReturn.getThird()->length() != 0) {
584                                         // insert into the local block chain
585                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
586                                 }
587                                 // continue;
588                         } else {
589                                 bool isInserted = false;
590                                 for (uint si = 0; si < newSlots->length(); si++) {
591                                         Slot *s = newSlots->get(si);
592                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
593                                                 isInserted = true;
594                                                 break;
595                                         }
596                                 }
597
598                                 for (uint si = 0; si < newSlots->length(); si++) {
599                                         Slot *s = newSlots->get(si);
600                                         if (isInserted) {
601                                                 break;
602                                         }
603
604                                         // Process each entry in the slot
605                                         Vector<Entry *> *entries = s->getEntries();
606                                         uint eSize = entries->size();
607                                         for(uint ei=0; ei < eSize; ei++) {
608                                                 Entry * entry = entries->get(ei);
609                                                 
610                                                 if (entry->getType() == TypeLastMessage) {
611                                                         LastMessage *lastMessage = (LastMessage *)entry;
612                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
613                                                                 isInserted = true;
614                                                                 break;
615                                                         }
616                                                 }
617                                         }
618                                 }
619
620                                 if (isInserted) {
621                                         if (newKey != NULL) {
622                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
623                                                         newKey = NULL;
624                                                 }
625                                         }
626
627                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628                                         while (trit->hasNext()) {
629                                                 Transaction *transaction = trit->next();
630                                                 transaction->resetServerFailure();
631
632                                                 // Update which transactions parts still need to be sent
633                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
634
635                                                 // Add the transaction status to the outstanding list
636                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
637
638                                                 // Update the transaction status
639                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
640
641                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642                                                 if (transaction->didSendAllParts()) {
643                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644                                                         pendingTransactionQueue->remove(transaction);
645                                                 } else {
646                                                         transaction->resetServerFailure();
647                                                         // Set the transaction sequence number back to nothing
648                                                         if (!transaction->didSendAPartToServer()) {
649                                                                 transaction->setSequenceNumber(-1);
650                                                         }
651                                                 }
652                                         }
653                                         delete trit;
654                                 } else {
655                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656                                         while (trit->hasNext()) {
657                                                 Transaction *transaction = trit->next();
658                                                 transaction->resetServerFailure();
659                                                 // Set the transaction sequence number back to nothing
660                                                 if (!transaction->didSendAPartToServer()) {
661                                                         transaction->setSequenceNumber(-1);
662                                                 }
663                                         }
664                                         delete trit;
665                                 }
666
667                                 // insert into the local block chain
668                                 validateAndUpdate(newSlots, true);
669                         }
670                 }
671         } catch (ServerException *e) {
672                 throw e;
673         }
674
675
676
677         try {
678                 // While we have stuff that needs inserting into the block chain
679                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
680
681                         fromRetry = false;
682
683                         if (hadPartialSendToServer) {
684                                 throw new Error("Should Be error free");
685                         }
686
687
688
689                         // If there is a new key with same name then end
690                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
691                                 return false;
692                         }
693
694                         // Create the slot
695                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696                         localSequenceNumber++;
697
698                         // Try to fill the slot with data
699                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700                         bool needsResize = fillSlotsReturn.getFirst();
701                         int newSize = fillSlotsReturn.getSecond();
702                         bool insertedNewKey = fillSlotsReturn.getThird();
703
704                         if (needsResize) {
705                                 // Reset which transaction to send
706                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707                                 while (trit->hasNext()) {
708                                         Transaction *transaction = trit->next();
709                                         transaction->resetNextPartToSend();
710
711                                         // Set the transaction sequence number back to nothing
712                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713                                                 transaction->setSequenceNumber(-1);
714                                         }
715                                 }
716                                 delete trit;
717
718                                 // Clear the sent data since we are trying again
719                                 pendingSendArbitrationEntriesToDelete->clear();
720                                 transactionPartsSent->clear();
721
722                                 // We needed a resize so try again
723                                 fillSlot(slot, true, newKey);
724                         }
725
726                         lastSlotAttemptedToSend = slot;
727                         lastIsNewKey = (newKey != NULL);
728                         lastInsertedNewKey = insertedNewKey;
729                         lastNewSize = newSize;
730                         lastNewKey = newKey;
731                         lastTransactionPartsSent = transactionPartsSent->clone();
732                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
733
734                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
735
736                         if (sendSlotsReturn.getFirst()) {
737
738                                 // Did insert into the block chain
739
740                                 if (insertedNewKey) {
741                                         // This slot was what was inserted not a previous slot
742
743                                         // New Key was successfully inserted into the block chain so dont want to insert it again
744                                         newKey = NULL;
745                                 }
746
747                                 // Remove the aborts and commit parts that were sent from the pending to send queue
748                                 uint size = pendingSendArbitrationRounds->size();
749                                 uint oldcount = 0;
750                                 for (uint i = 0; i < size; i++) {
751                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
753
754                                         if (!round->isDoneSending()) {
755                                                 // Sent all the parts
756                                                 pendingSendArbitrationRounds->set(oldcount++,
757                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
758                                         }
759                                 }
760                                 pendingSendArbitrationRounds->setSize(oldcount);
761
762                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763                                 while (trit->hasNext()) {
764                                         Transaction *transaction = trit->next();
765                                         transaction->resetServerFailure();
766
767                                         // Update which transactions parts still need to be sent
768                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
769
770                                         // Add the transaction status to the outstanding list
771                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
772
773                                         // Update the transaction status
774                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
775
776                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
777                                         if (transaction->didSendAllParts()) {
778                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779                                                 pendingTransactionQueue->remove(transaction);
780                                         }
781                                 }
782                                 delete trit;
783                         } else {
784                                 // Reset which transaction to send
785                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786                                 while (trit->hasNext()) {
787                                         Transaction *transaction = trit->next();
788                                         transaction->resetNextPartToSend();
789
790                                         // Set the transaction sequence number back to nothing
791                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792                                                 transaction->setSequenceNumber(-1);
793                                         }
794                                 }
795                                 delete trit;
796                         }
797
798                         // Clear the sent data in preparation for next send
799                         pendingSendArbitrationEntriesToDelete->clear();
800                         transactionPartsSent->clear();
801
802                         if (sendSlotsReturn.getThird()->length() != 0) {
803                                 // insert into the local block chain
804                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
805                         }
806                 }
807
808         } catch (ServerException *e) {
809                 if (e->getType() != ServerException_TypeInputTimeout) {
810                         // Nothing was able to be sent to the server so just clear these data structures
811                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812                         while (trit->hasNext()) {
813                                 Transaction *transaction = trit->next();
814                                 transaction->resetNextPartToSend();
815
816                                 // Set the transaction sequence number back to nothing
817                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818                                         transaction->setSequenceNumber(-1);
819                                 }
820                         }
821                         delete trit;
822                 } else {
823                         // There was a partial send to the server
824                         hadPartialSendToServer = true;
825
826                         // Nothing was able to be sent to the server so just clear these data structures
827                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828                         while (trit->hasNext()) {
829                                 Transaction *transaction = trit->next();
830                                 transaction->resetNextPartToSend();
831                                 transaction->setServerFailure();
832                         }
833                         delete trit;
834                 }
835
836                 pendingSendArbitrationEntriesToDelete->clear();
837                 transactionPartsSent->clear();
838
839                 throw e;
840         }
841
842         return newKey == NULL;
843 }
844
845 bool Table::updateFromLocal(int64_t machineId) {
846         if (!localCommunicationTable->contains(machineId))
847                 return false;
848
849         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
850
851         // Get the size of the send data
852         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
853
854         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
857         }
858
859         Array<char> *sendData = new Array<char>(sendDataSize);
860         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
861
862         // Encode the data
863         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
864         bbEncode->putInt(0);
865
866         // Send by local
867         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868         localSequenceNumber++;
869
870         if (returnData == NULL) {
871                 // Could not contact server
872                 return false;
873         }
874
875         // Decode the data
876         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877         int numberOfEntries = bbDecode->getInt();
878
879         for (int i = 0; i < numberOfEntries; i++) {
880                 char type = bbDecode->get();
881                 if (type == TypeAbort) {
882                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
883                         processEntry(abort);
884                 } else if (type == TypeCommitPart) {
885                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886                         processEntry(commitPart);
887                 }
888         }
889
890         updateLiveStateFromLocal();
891
892         return true;
893 }
894
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
896
897         // Get the devices local communications
898         if (!localCommunicationTable->contains(transaction->getArbitrator()))
899                 return Pair<bool, bool>(true, false);
900         
901         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
902
903         // Get the size of the send data
904         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
905         {
906                 Vector<TransactionPart *> * tParts = transaction->getParts();
907                 uint tPartsSize = tParts->size();
908                 for (uint i = 0; i < tPartsSize; i++) {
909                         TransactionPart * part = tParts->get(i);
910                         sendDataSize += part->getSize();
911                 }
912         }
913
914         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
917         }
918
919         // Make the send data size
920         Array<char> *sendData = new Array<char>(sendDataSize);
921         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
922
923         // Encode the data
924         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925         bbEncode->putInt(transaction->getParts()->size());
926         {
927                 Vector<TransactionPart *> * tParts = transaction->getParts();
928                 uint tPartsSize = tParts->size();
929                 for (uint i = 0; i < tPartsSize; i++) {
930                         TransactionPart * part = tParts->get(i);
931                         part->encode(bbEncode);
932                 }
933         }
934
935         // Send by local
936         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937         localSequenceNumber++;
938
939         if (returnData == NULL) {
940                 // Could not contact server
941                 return Pair<bool, bool>(true, false);
942         }
943
944         // Decode the data
945         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946         bool didCommit = bbDecode->get() == 1;
947         bool couldArbitrate = bbDecode->get() == 1;
948         int numberOfEntries = bbDecode->getInt();
949         bool foundAbort = false;
950
951         for (int i = 0; i < numberOfEntries; i++) {
952                 char type = bbDecode->get();
953                 if (type == TypeAbort) {
954                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
955
956                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
957                                 foundAbort = true;
958                         }
959
960                         processEntry(abort);
961                 } else if (type == TypeCommitPart) {
962                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963                         processEntry(commitPart);
964                 }
965         }
966
967         updateLiveStateFromLocal();
968
969         if (couldArbitrate) {
970                 TransactionStatus * status =  transaction->getTransactionStatus();
971                 if (didCommit) {
972                         status->setStatus(TransactionStatus_StatusCommitted);
973                 } else {
974                         status->setStatus(TransactionStatus_StatusAborted);
975                 }
976         } else {
977                 TransactionStatus * status =  transaction->getTransactionStatus();
978                 if (foundAbort) {
979                         status->setStatus(TransactionStatus_StatusAborted);
980                 } else {
981                         status->setStatus(TransactionStatus_StatusCommitted);
982                 }
983         }
984
985         return Pair<bool, bool>(false, true);
986 }
987
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
989         // Decode the data
990         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992         int numberOfParts = bbDecode->getInt();
993
994         // If we did commit a transaction or not
995         bool didCommit = false;
996         bool couldArbitrate = false;
997
998         if (numberOfParts != 0) {
999
1000                 // decode the transaction
1001                 Transaction *transaction = new Transaction();
1002                 for (int i = 0; i < numberOfParts; i++) {
1003                         bbDecode->get();
1004                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005                         transaction->addPartDecode(newPart);
1006                 }
1007
1008                 // Arbitrate on transaction and pull relevant return data
1009                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010                 couldArbitrate = localArbitrateReturn.getFirst();
1011                 didCommit = localArbitrateReturn.getSecond();
1012
1013                 updateLiveStateFromLocal();
1014
1015                 // Transaction was sent to the server so keep track of it to prevent double commit
1016                 if (transaction->getSequenceNumber() != -1) {
1017                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1018                 }
1019         }
1020
1021         // The data to send back
1022         int returnDataSize = 0;
1023         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1024
1025         // Get the aborts to send back
1026         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1027         {
1028                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029                 while(abortit->hasNext())
1030                         abortLocalSequenceNumbers->add(abortit->next());
1031                 delete abortit;
1032         }
1033         
1034         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1035
1036         uint asize = abortLocalSequenceNumbers->size();
1037         for(uint i=0; i<asize; i++) {
1038                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1040                         continue;
1041                 }
1042                 
1043                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044                 unseenArbitrations->add(abort);
1045                 returnDataSize += abort->getSize();
1046         }
1047
1048         // Get the commits to send back
1049         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050         if (commitForClientTable != NULL) {
1051                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1052                 {
1053                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054                         while(commitit->hasNext())
1055                                 commitLocalSequenceNumbers->add(commitit->next());
1056                         delete commitit;
1057                 }
1058                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1059
1060                 uint clsSize = commitLocalSequenceNumbers->size();
1061                 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1064
1065                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1066                                 continue;
1067                         }
1068
1069                         {
1070                                 Vector<CommitPart *> * parts = commit->getParts();
1071                                 uint nParts = parts->size();
1072                                 for(uint i=0; i<nParts; i++) {
1073                                         CommitPart * commitPart = parts->get(i);
1074                                         unseenArbitrations->add(commitPart);
1075                                         returnDataSize += commitPart->getSize();
1076                                 }
1077                         }
1078                 }
1079         }
1080
1081         // Number of arbitration entries to decode
1082         returnDataSize += 2 * sizeof(int32_t);
1083
1084         // bool of did commit or not
1085         if (numberOfParts != 0) {
1086                 returnDataSize += sizeof(char);
1087         }
1088
1089         // Data to send Back
1090         Array<char> *returnData = new Array<char>(returnDataSize);
1091         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1092
1093         if (numberOfParts != 0) {
1094                 if (didCommit) {
1095                         bbEncode->put((char)1);
1096                 } else {
1097                         bbEncode->put((char)0);
1098                 }
1099                 if (couldArbitrate) {
1100                         bbEncode->put((char)1);
1101                 } else {
1102                         bbEncode->put((char)0);
1103                 }
1104         }
1105
1106         bbEncode->putInt(unseenArbitrations->size());
1107         uint size = unseenArbitrations->size();
1108         for (uint i = 0; i < size; i++) {
1109                 Entry *entry = unseenArbitrations->get(i);
1110                 entry->encode(bbEncode);
1111         }
1112
1113         localSequenceNumber++;
1114         return returnData;
1115 }
1116
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119         attemptedToSendToServer = true;
1120
1121         bool inserted = false;
1122         bool lastTryInserted = false;
1123
1124         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125         if (array == NULL) {
1126                 array = new Array<Slot *>();
1127                 array->set(0, slot);
1128                 rejectedSlotVector->clear();
1129                 inserted = true;
1130         } else {
1131                 if (array->length() == 0) {
1132                         throw new Error("Server Error: Did not send any slots");
1133                 }
1134
1135                 // if (attemptedToSendToServerTmp) {
1136                 if (hadPartialSendToServer) {
1137
1138                         bool isInserted = false;
1139                         uint size = array->length();
1140                         for (uint i = 0; i < size; i++) {
1141                                 Slot *s = array->get(i);
1142                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1143                                         isInserted = true;
1144                                         break;
1145                                 }
1146                         }
1147
1148                         for (uint i = 0; i < size; i++) {
1149                                 Slot *s = array->get(i);
1150                                 if (isInserted) {
1151                                         break;
1152                                 }
1153
1154                                 // Process each entry in the slot
1155                                 Vector<Entry *> *entries = s->getEntries();
1156                                 uint eSize = entries->size();
1157                                 for(uint ei=0; ei < eSize; ei++) {
1158                                         Entry * entry = entries->get(ei);
1159
1160                                         if (entry->getType() == TypeLastMessage) {
1161                                                 LastMessage *lastMessage = (LastMessage *)entry;
1162
1163                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1164                                                         isInserted = true;
1165                                                         break;
1166                                                 }
1167                                         }
1168                                 }
1169                         }
1170
1171                         if (!isInserted) {
1172                                 rejectedSlotVector->add(slot->getSequenceNumber());
1173                                 lastTryInserted = false;
1174                         } else {
1175                                 lastTryInserted = true;
1176                         }
1177                 } else {
1178                         rejectedSlotVector->add(slot->getSequenceNumber());
1179                         lastTryInserted = false;
1180                 }
1181         }
1182
1183         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1184 }
1185
1186 /**
1187  * Returns false if a resize was needed
1188  */
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1190         int newSize = 0;
1191         if (liveSlotCount > bufferResizeThreshold) {
1192                 resize = true;//Resize is forced
1193         }
1194
1195         if (resize) {
1196                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197                 TableStatus *status = new TableStatus(slot, newSize);
1198                 slot->addEntry(status);
1199         }
1200
1201         // Fill with rejected slots first before doing anything else
1202         doRejectedMessages(slot);
1203
1204         // Do mandatory rescue of entries
1205         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1206
1207         // Extract working variables
1208         bool needsResize = mandatoryRescueReturn.getFirst();
1209         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1211
1212         if (needsResize && !resize) {
1213                 // We need to resize but we are not resizing so return false
1214                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1215         }
1216
1217         bool inserted = false;
1218         if (newKeyEntry != NULL) {
1219                 newKeyEntry->setSlot(slot);
1220                 if (slot->hasSpace(newKeyEntry)) {
1221                         slot->addEntry(newKeyEntry);
1222                         inserted = true;
1223                 }
1224         }
1225
1226         // Clear the transactions, aborts and commits that were sent previously
1227         transactionPartsSent->clear();
1228         pendingSendArbitrationEntriesToDelete->clear();
1229         uint size = pendingSendArbitrationRounds->size();
1230         for (uint i = 0; i < size; i++) {
1231                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232                 bool isFull = false;
1233                 round->generateParts();
1234                 Vector<Entry *> *parts = round->getParts();
1235
1236                 // Insert pending arbitration data
1237                 uint vsize = parts->size();
1238                 for (uint vi = 0; vi < vsize; vi++) {
1239                         Entry *arbitrationData = parts->get(vi);
1240
1241                         // If it is an abort then we need to set some information
1242                         if (arbitrationData->getType() == TypeAbort) {
1243                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1244                         }
1245
1246                         if (!slot->hasSpace(arbitrationData)) {
1247                                 // No space so cant do anything else with these data entries
1248                                 isFull = true;
1249                                 break;
1250                         }
1251
1252                         // Add to this current slot and add it to entries to delete
1253                         slot->addEntry(arbitrationData);
1254                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1255                 }
1256
1257                 if (isFull) {
1258                         break;
1259                 }
1260         }
1261
1262         if (pendingTransactionQueue->size() > 0) {
1263                 Transaction *transaction = pendingTransactionQueue->get(0);
1264                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266                         transaction->setSequenceNumber(slot->getSequenceNumber());
1267                 }
1268
1269                 while (true) {
1270                         TransactionPart *part = transaction->getNextPartToSend();
1271                         if (part == NULL) {
1272                                 // Ran out of parts to send for this transaction so move on
1273                                 break;
1274                         }
1275
1276                         if (slot->hasSpace(part)) {
1277                                 slot->addEntry(part);
1278                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279                                 if (partsSent == NULL) {
1280                                         partsSent = new Vector<int32_t>();
1281                                         transactionPartsSent->put(transaction, partsSent);
1282                                 }
1283                                 partsSent->add(part->getPartNumber());
1284                                 transactionPartsSent->put(transaction, partsSent);
1285                         } else {
1286                                 break;
1287                         }
1288                 }
1289         }
1290
1291         // Fill the remainder of the slot with rescue data
1292         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1293
1294         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1295 }
1296
1297 void Table::doRejectedMessages(Slot *s) {
1298         if (!rejectedSlotVector->isEmpty()) {
1299                 /* TODO: We should avoid generating a rejected message entry if
1300                  * there is already a sufficient entry in the queue (e->g->,
1301                  * equalsto value of true and same sequence number)->  */
1302
1303                 int64_t old_seqn = rejectedSlotVector->get(0);
1304                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305                         int64_t new_seqn = rejectedSlotVector->lastElement();
1306                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1307                         s->addEntry(rm);
1308                 } else {
1309                         int64_t prev_seqn = -1;
1310                         int i = 0;
1311                         /* Go through list of missing messages */
1312                         for (; i < rejectedSlotVector->size(); i++) {
1313                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1314                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1315                                 if (s_msg != NULL)
1316                                         break;
1317                                 prev_seqn = curr_seqn;
1318                         }
1319                         /* Generate rejected message entry for missing messages */
1320                         if (prev_seqn != -1) {
1321                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1322                                 s->addEntry(rm);
1323                         }
1324                         /* Generate rejected message entries for present messages */
1325                         for (; i < rejectedSlotVector->size(); i++) {
1326                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1327                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1328                                 int64_t machineid = s_msg->getMachineID();
1329                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1330                                 s->addEntry(rm);
1331                         }
1332                 }
1333         }
1334 }
1335
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1341         }
1342
1343         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344         bool seenLiveSlot = false;
1345         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1346         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1347
1348
1349         // Mandatory Rescue
1350         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351                 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352                 // Push slot number forward
1353                 if (!seenLiveSlot) {
1354                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1355                 }
1356
1357                 if (!previousSlot->isLive()) {
1358                         continue;
1359                 }
1360
1361                 // We have seen a live slot
1362                 seenLiveSlot = true;
1363
1364                 // Get all the live entries for a slot
1365                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1366
1367                 // Iterate over all the live entries and try to rescue them
1368                 uint lESize = liveEntries->size();
1369                 for (uint i=0; i< lESize; i++) {
1370                         Entry * liveEntry = liveEntries->get(i);
1371                         if (slot->hasSpace(liveEntry)) {
1372                                 // Enough space to rescue the entry
1373                                 slot->addEntry(liveEntry);
1374                         } else if (currentSequenceNumber == firstIfFull) {
1375                                 //if there's no space but the entry is about to fall off the queue
1376                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1377                         }
1378                 }
1379         }
1380
1381         // Did not resize
1382         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1383 }
1384
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386         /* now go through live entries from least to greatest sequence number until
1387          * either all live slots added, or the slot doesn't have enough room
1388          * for SKIP_THRESHOLD consecutive entries*/
1389         int skipcount = 0;
1390         int64_t newestseqnum = buffer->getNewestSeqNum();
1391 search:
1392         for (; seqn <= newestseqnum; seqn++) {
1393                 Slot *prevslot = buffer->getSlot(seqn);
1394                 //Push slot number forward
1395                 if (!seenliveslot)
1396                         oldestLiveSlotSequenceNumver = seqn;
1397
1398                 if (!prevslot->isLive())
1399                         continue;
1400                 seenliveslot = true;
1401                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402                 uint lESize = liveentries->size();
1403                 for (uint i=0; i< lESize; i++) {
1404                         Entry * liveentry = liveentries->get(i);
1405                         if (s->hasSpace(liveentry))
1406                                 s->addEntry(liveentry);
1407                         else {
1408                                 skipcount++;
1409                                 if (skipcount > Table_SKIP_THRESHOLD)
1410                                         goto donesearch;
1411                         }
1412                 }
1413         }
1414  donesearch:
1415         ;
1416 }
1417
1418 /**
1419  * Checks for malicious activity and updates the local copy of the block chain->
1420  */
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422         // The cloud communication layer has checked slot HMACs already
1423         // before decoding
1424         if (newSlots->length() == 0) {
1425                 return;
1426         }
1427
1428         // Make sure all slots are newer than the last largest slot this
1429         // client has seen
1430         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431         if (firstSeqNum <= sequenceNumber) {
1432                 throw new Error("Server Error: Sent older slots!");
1433         }
1434
1435         // Create an object that can access both new slots and slots in our
1436         // local chain without committing slots to our local chain
1437         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1438
1439         // Check that the HMAC chain is not broken
1440         checkHMACChain(indexer, newSlots);
1441
1442         // Set to keep track of messages from clients
1443         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1444         {
1445                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446                 while(lmit->hasNext())
1447                         machineSet->add(lmit->next());
1448                 delete lmit;
1449         }
1450
1451         // Process each slots data
1452         {
1453                 uint numSlots = newSlots->length();
1454                 for(uint i=0; i<numSlots; i++) {
1455                         Slot *slot = newSlots->get(i);
1456                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457                         updateExpectedSize();
1458                 }
1459         }
1460
1461         // If there is a gap, check to see if the server sent us
1462         // everything->
1463         if (firstSeqNum != (sequenceNumber + 1)) {
1464
1465                 // Check the size of the slots that were sent down by the server->
1466                 // Can only check the size if there was a gap
1467                 checkNumSlots(newSlots->length());
1468
1469                 // Since there was a gap every machine must have pushed a slot or
1470                 // must have a last message message-> If not then the server is
1471                 // hiding slots
1472                 if (!machineSet->isEmpty()) {
1473                         throw new Error("Missing record for machines: ");
1474                 }
1475         }
1476
1477         // Update the size of our local block chain->
1478         commitNewMaxSize();
1479
1480         // Commit new to slots to the local block chain->
1481         {
1482                 uint numSlots = newSlots->length();
1483                 for(uint i=0; i<numSlots; i++) {
1484                         Slot *slot = newSlots->get(i);
1485                         
1486                         // Insert this slot into our local block chain copy->
1487                         buffer->putSlot(slot);
1488                         
1489                         // Keep track of how many slots are currently live (have live data
1490                         // in them)->
1491                         liveSlotCount++;
1492                 }
1493         }
1494         // Get the sequence number of the latest slot in the system
1495         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496         updateLiveStateFromServer();
1497
1498         // No Need to remember after we pulled from the server
1499         offlineTransactionsCommittedAndAtServer->clear();
1500
1501         // This is invalidated now
1502         hadPartialSendToServer = false;
1503 }
1504
1505 void Table::updateLiveStateFromServer() {
1506         // Process the new transaction parts
1507         processNewTransactionParts();
1508
1509         // Do arbitration on new transactions that were received
1510         arbitrateFromServer();
1511
1512         // Update all the committed keys
1513         bool didCommitOrSpeculate = updateCommittedTable();
1514
1515         // Delete the transactions that are now dead
1516         updateLiveTransactionsAndStatus();
1517
1518         // Do speculations
1519         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1521 }
1522
1523 void Table::updateLiveStateFromLocal() {
1524         // Update all the committed keys
1525         bool didCommitOrSpeculate = updateCommittedTable();
1526
1527         // Delete the transactions that are now dead
1528         updateLiveTransactionsAndStatus();
1529
1530         // Do speculations
1531         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1533 }
1534
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536         int64_t prevslots = firstSequenceNumber;
1537
1538         if (didFindTableStatus) {
1539         } else {
1540                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1541         }
1542
1543         didFindTableStatus = true;
1544         currMaxSize = numberOfSlots;
1545 }
1546
1547 void Table::updateExpectedSize() {
1548         expectedsize++;
1549
1550         if (expectedsize > currMaxSize) {
1551                 expectedsize = currMaxSize;
1552         }
1553 }
1554
1555
1556 /**
1557  * Check the size of the block chain to make sure there are enough
1558  * slots sent back by the server-> This is only called when we have a
1559  * gap between the slots that we have locally and the slots sent by
1560  * the server therefore in the slots sent by the server there will be
1561  * at least 1 Table status message
1562  */
1563 void Table::checkNumSlots(int numberOfSlots) {
1564         if (numberOfSlots != expectedsize) {
1565                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1566         }
1567 }
1568
1569 /**
1570  * Update the size of of the local buffer if it is needed->
1571  */
1572 void Table::commitNewMaxSize() {
1573         didFindTableStatus = false;
1574
1575         // Resize the local slot buffer
1576         if (numberOfSlots != currMaxSize) {
1577                 buffer->resize((int32_t)currMaxSize);
1578         }
1579
1580         // Change the number of local slots to the new size
1581         numberOfSlots = (int32_t)currMaxSize;
1582
1583         // Recalculate the resize threshold since the size of the local
1584         // buffer has changed
1585         setResizeThreshold();
1586 }
1587
1588 /**
1589  * Process the new transaction parts from this latest round of slots
1590  * received from the server
1591  */
1592 void Table::processNewTransactionParts() {
1593
1594         if (newTransactionParts->size() == 0) {
1595                 // Nothing new to process
1596                 return;
1597         }
1598
1599         // Iterate through all the machine Ids that we received new parts
1600         // for
1601         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602         while(tpit->hasNext()) {
1603                 int64_t machineId = tpit->next();
1604                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1605
1606                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607                 // Iterate through all the parts for that machine Id
1608                 while(ptit->hasNext()) {
1609                         Pair<int64_t, int32_t> * partId = ptit->next();
1610                         TransactionPart *part = parts->get(partId);
1611
1612                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615                                         // Set dead the transaction part
1616                                         part->setDead();
1617                                         continue;
1618                                 }
1619                         }
1620                         
1621                         // Get the transaction object for that sequence number
1622                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1623
1624                         if (transaction == NULL) {
1625                                 // This is a new transaction that we dont have so make a new one
1626                                 transaction = new Transaction();
1627
1628                                 // Insert this new transaction into the live tables
1629                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1631                         }
1632
1633                         // Add that part to the transaction
1634                         transaction->addPartDecode(part);
1635                 }
1636                 delete ptit;
1637         }
1638         delete tpit;
1639         // Clear all the new transaction parts in preparation for the next
1640         // time the server sends slots
1641         newTransactionParts->clear();
1642 }
1643
1644 void Table::arbitrateFromServer() {
1645
1646         if (liveTransactionBySequenceNumberTable->size() == 0) {
1647                 // Nothing to arbitrate on so move on
1648                 return;
1649         }
1650
1651         // Get the transaction sequence numbers and sort from oldest to newest
1652         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1653         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1654
1655         // Collection of key value pairs that are
1656         Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1657
1658         // The last transaction arbitrated on
1659         int64_t lastTransactionCommitted = -1;
1660         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1661
1662         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1663                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1664
1665
1666
1667                 // Check if this machine arbitrates for this transaction if not
1668                 // then we cant arbitrate this transaction
1669                 if (transaction->getArbitrator() != localMachineId) {
1670                         continue;
1671                 }
1672
1673                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1674                         continue;
1675                 }
1676
1677                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1678                         // We have seen this already locally so dont commit again
1679                         continue;
1680                 }
1681
1682
1683                 if (!transaction->isComplete()) {
1684                         // Will arbitrate in incorrect order if we continue so just break
1685                         // Most likely this
1686                         break;
1687                 }
1688
1689
1690                 // update the largest transaction seen by arbitrator from server
1691                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1692                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1693                 } else {
1694                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1695                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1696                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1697                         }
1698                 }
1699
1700                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1701                         // Guard evaluated as true
1702
1703                         // Update the local changes so we can make the commit
1704                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1705                         while (kvit->hasNext()) {
1706                                 KeyValue *kv = kvit->next();
1707                                 speculativeTableTmp->put(kv->getKey(), kv);
1708                         }
1709                         delete kvit;
1710                         
1711                         // Update what the last transaction committed was for use in batch commit
1712                         lastTransactionCommitted = transactionSequenceNumber;
1713                 } else {
1714                         // Guard evaluated was false so create abort
1715                         // Create the abort
1716                         Abort *newAbort = new Abort(NULL,
1717                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1718                                                                                                                                         transaction->getSequenceNumber(),
1719                                                                                                                                         transaction->getMachineId(),
1720                                                                                                                                         transaction->getArbitrator(),
1721                                                                                                                                         localArbitrationSequenceNumber);
1722                         localArbitrationSequenceNumber++;
1723                         generatedAborts->add(newAbort);
1724
1725                         // Insert the abort so we can process
1726                         processEntry(newAbort);
1727                 }
1728
1729                 lastSeqNumArbOn = transactionSequenceNumber;
1730         }
1731
1732         Commit *newCommit = NULL;
1733
1734         // If there is something to commit
1735         if (speculativeTableTmp->size() != 0) {
1736                 // Create the commit and increment the commit sequence number
1737                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1738                 localArbitrationSequenceNumber++;
1739
1740                 // Add all the new keys to the commit
1741                 for (KeyValue *kv : speculativeTableTmp->values()) {
1742                         newCommit->addKV(kv);
1743                 }
1744
1745                 // create the commit parts
1746                 newCommit->createCommitParts();
1747
1748                 // Append all the commit parts to the end of the pending queue
1749                 // waiting for sending to the server
1750                 // Insert the commit so we can process it
1751                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1752                         processEntry(commitPart);
1753                 }
1754         }
1755
1756         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1757                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1758                 pendingSendArbitrationRounds->add(arbitrationRound);
1759
1760                 if (compactArbitrationData()) {
1761                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1762                         if (newArbitrationRound->getCommit() != NULL) {
1763                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1764                                         processEntry(commitPart);
1765                                 }
1766                         }
1767                 }
1768         }
1769 }
1770
1771 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1772
1773         // Check if this machine arbitrates for this transaction if not then
1774         // we cant arbitrate this transaction
1775         if (transaction->getArbitrator() != localMachineId) {
1776                 return Pair<bool, bool>(false, false);
1777         }
1778
1779         if (!transaction->isComplete()) {
1780                 // Will arbitrate in incorrect order if we continue so just break
1781                 // Most likely this
1782                 return Pair<bool, bool>(false, false);
1783         }
1784
1785         if (transaction->getMachineId() != localMachineId) {
1786                 // dont do this check for local transactions
1787                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1788                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1789                                 // We've have already seen this from the server
1790                                 return Pair<bool, bool>(false, false);
1791                         }
1792                 }
1793         }
1794
1795         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1796                 // Guard evaluated as true Create the commit and increment the
1797                 // commit sequence number
1798                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1799                 localArbitrationSequenceNumber++;
1800
1801                 // Update the local changes so we can make the commit
1802                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1803                 while (kvit->hasNext()) {
1804                         KeyValue *kv = kvit->next();
1805                         newCommit->addKV(kv);
1806                 }
1807                 delete kvit;
1808                 
1809                 // create the commit parts
1810                 newCommit->createCommitParts();
1811
1812                 // Append all the commit parts to the end of the pending queue
1813                 // waiting for sending to the server
1814                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1815                 pendingSendArbitrationRounds->add(arbitrationRound);
1816
1817                 if (compactArbitrationData()) {
1818                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1819                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1820                                 processEntry(commitPart);
1821                         }
1822                 } else {
1823                         // Insert the commit so we can process it
1824                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1825                                 processEntry(commitPart);
1826                         }
1827                 }
1828
1829                 if (transaction->getMachineId() == localMachineId) {
1830                         TransactionStatus *status = transaction->getTransactionStatus();
1831                         if (status != NULL) {
1832                                 status->setStatus(TransactionStatus_StatusCommitted);
1833                         }
1834                 }
1835
1836                 updateLiveStateFromLocal();
1837                 return Pair<bool, bool>(true, true);
1838         } else {
1839                 if (transaction->getMachineId() == localMachineId) {
1840                         // For locally created messages update the status
1841                         // Guard evaluated was false so create abort
1842                         TransactionStatus * status = transaction->getTransactionStatus();
1843                         if (status != NULL) {
1844                                 status->setStatus(TransactionStatus_StatusAborted);
1845                         }
1846                 } else {
1847                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1848
1849                         // Create the abort
1850                         Abort *newAbort = new Abort(NULL,
1851                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1852                                                                                                                                         -1,
1853                                                                                                                                         transaction->getMachineId(),
1854                                                                                                                                         transaction->getArbitrator(),
1855                                                                                                                                         localArbitrationSequenceNumber);
1856                         localArbitrationSequenceNumber++;
1857                         addAbortSet->add(newAbort);
1858
1859                         // Append all the commit parts to the end of the pending queue
1860                         // waiting for sending to the server
1861                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1862                         pendingSendArbitrationRounds->add(arbitrationRound);
1863
1864                         if (compactArbitrationData()) {
1865                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1866                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1867                                         processEntry(commitPart);
1868                                 }
1869                         }
1870                 }
1871
1872                 updateLiveStateFromLocal();
1873                 return Pair<bool, bool>(true, false);
1874         }
1875 }
1876
1877 /**
1878  * Compacts the arbitration data my merging commits and aggregating
1879  * aborts so that a single large push of commits can be done instead
1880  * of many small updates
1881  */
1882 bool Table::compactArbitrationData() {
1883         if (pendingSendArbitrationRounds->size() < 2) {
1884                 // Nothing to compact so do nothing
1885                 return false;
1886         }
1887
1888         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1889         if (lastRound->getDidSendPart()) {
1890                 return false;
1891         }
1892
1893         bool hadCommit = (lastRound->getCommit() == NULL);
1894         bool gotNewCommit = false;
1895
1896         int numberToDelete = 1;
1897         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1898                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1899
1900                 if (round->isFull() || round->getDidSendPart()) {
1901                         // Stop since there is a part that cannot be compacted and we
1902                         // need to compact in order
1903                         break;
1904                 }
1905
1906                 if (round->getCommit() == NULL) {
1907                         // Try compacting aborts only
1908                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1909                         if (newSize > ArbitrationRound_MAX_PARTS) {
1910                                 // Cant compact since it would be too large
1911                                 break;
1912                         }
1913                         lastRound->addAborts(round->getAborts());
1914                 } else {
1915                         // Create a new larger commit
1916                         Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1917                         localArbitrationSequenceNumber++;
1918
1919                         // Create the commit parts so that we can count them
1920                         newCommit->createCommitParts();
1921
1922                         // Calculate the new size of the parts
1923                         int newSize = newCommit->getNumberOfParts();
1924                         newSize += lastRound->getAbortsCount();
1925                         newSize += round->getAbortsCount();
1926
1927                         if (newSize > ArbitrationRound_MAX_PARTS) {
1928                                 // Cant compact since it would be too large
1929                                 break;
1930                         }
1931
1932                         // Set the new compacted part
1933                         lastRound->setCommit(newCommit);
1934                         lastRound->addAborts(round->getAborts());
1935                         gotNewCommit = true;
1936                 }
1937
1938                 numberToDelete++;
1939         }
1940
1941         if (numberToDelete != 1) {
1942                 // If there is a compaction
1943                 // Delete the previous pieces that are now in the new compacted piece
1944                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1945                         pendingSendArbitrationRounds->clear();
1946                 } else {
1947                         for (int i = 0; i < numberToDelete; i++) {
1948                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1949                         }
1950                 }
1951
1952                 // Add the new compacted into the pending to send list
1953                 pendingSendArbitrationRounds->add(lastRound);
1954
1955                 // Should reinsert into the commit processor
1956                 if (hadCommit && gotNewCommit) {
1957                         return true;
1958                 }
1959         }
1960
1961         return false;
1962 }
1963
1964 /**
1965  * Update all the commits and the committed tables, sets dead the dead
1966  * transactions
1967  */
1968 bool Table::updateCommittedTable() {
1969
1970         if (newCommitParts->size() == 0) {
1971                 // Nothing new to process
1972                 return false;
1973         }
1974
1975         // Iterate through all the machine Ids that we received new parts for
1976         for (int64_t machineId : newCommitParts->keySet()) {
1977                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1978
1979                 // Iterate through all the parts for that machine Id
1980                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1981                         CommitPart *part = parts->get(partId);
1982
1983                         // Get the transaction object for that sequence number
1984                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1985
1986                         if (commitForClientTable == NULL) {
1987                                 // This is the first commit from this device
1988                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1989                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1990                         }
1991
1992                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1993
1994                         if (commit == NULL) {
1995                                 // This is a new commit that we dont have so make a new one
1996                                 commit = new Commit();
1997
1998                                 // Insert this new commit into the live tables
1999                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2000                         }
2001
2002                         // Add that part to the commit
2003                         commit->addPartDecode(part);
2004                 }
2005         }
2006
2007         // Clear all the new commits parts in preparation for the next time
2008         // the server sends slots
2009         newCommitParts->clear();
2010
2011         // If we process a new commit keep track of it for future use
2012         bool didProcessANewCommit = false;
2013
2014         // Process the commits one by one
2015         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
2016
2017                 // Get all the commits for a specific arbitrator
2018                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2019
2020                 // Sort the commits in order
2021                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
2022                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2023
2024                 // Get the last commit seen from this arbitrator
2025                 int64_t lastCommitSeenSequenceNumber = -1;
2026                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
2027                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2028                 }
2029
2030                 // Go through each new commit one by one
2031                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2032                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2033                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2034
2035                         // Special processing if a commit is not complete
2036                         if (!commit->isComplete()) {
2037                                 if (i == (commitSequenceNumbers->size() - 1)) {
2038                                         // If there is an incomplete commit and this commit is the
2039                                         // latest one seen then this commit cannot be processed and
2040                                         // there are no other commits
2041                                         break;
2042                                 } else {
2043                                         // This is a commit that was already dead but parts of it
2044                                         // are still in the block chain (not flushed out yet)->
2045                                         // Delete it and move on
2046                                         commit->setDead();
2047                                         commitForClientTable->remove(commit->getSequenceNumber());
2048                                         continue;
2049                                 }
2050                         }
2051
2052                         // Update the last transaction that was updated if we can
2053                         if (commit->getTransactionSequenceNumber() != -1) {
2054                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2055
2056                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2057                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2058                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2059                                 }
2060                         }
2061
2062                         // Update the last arbitration data that we have seen so far
2063                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2064                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2065                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2066                                         // Is larger
2067                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2068                                 }
2069                         } else {
2070                                 // Never seen any data from this arbitrator so record the first one
2071                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2072                         }
2073
2074                         // We have already seen this commit before so need to do the
2075                         // full processing on this commit
2076                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2077
2078                                 // Update the last transaction that was updated if we can
2079                                 if (commit->getTransactionSequenceNumber() != -1) {
2080                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2081
2082                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2083                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2084                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2085                                         }
2086                                 }
2087
2088                                 continue;
2089                         }
2090
2091                         // If we got here then this is a brand new commit and needs full
2092                         // processing
2093                         // Get what commits should be edited, these are the commits that
2094                         // have live values for their keys
2095                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2096                         {
2097                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2098                                 while (kvit->hasNext()) {
2099                                         KeyValue *kv = kvit->next();
2100                                         commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2101                                 }
2102                                 delete kvit;
2103                         }
2104                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2105
2106                         // Update each previous commit that needs to be updated
2107                         for (Commit *previousCommit : commitsToEdit) {
2108
2109                                 // Only bother with live commits (TODO: Maybe remove this check)
2110                                 if (previousCommit->isLive()) {
2111
2112                                         // Update which keys in the old commits are still live
2113                                         {
2114                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2115                                                 while (kvit->hasNext()) {
2116                                                         KeyValue *kv = kvit->next();
2117                                                         previousCommit->invalidateKey(kv->getKey());
2118                                                 }
2119                                                 delete kvit;
2120                                         }
2121                                         
2122                                         // if the commit is now dead then remove it
2123                                         if (!previousCommit->isLive()) {
2124                                                 commitForClientTable->remove(previousCommit);
2125                                         }
2126                                 }
2127                         }
2128
2129                         // Update the last seen sequence number from this arbitrator
2130                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2131                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2132                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2133                                 }
2134                         } else {
2135                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2136                         }
2137
2138                         // We processed a new commit that we havent seen before
2139                         didProcessANewCommit = true;
2140
2141                         // Update the committed table of keys and which commit is using which key
2142                         {
2143                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2144                                 while (kvit->hasNext()) {
2145                                         KeyValue *kv = kvit->next();
2146                                         committedKeyValueTable->put(kv->getKey(), kv);
2147                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2148                                 }
2149                                 delete kvit;
2150                         }
2151                 }
2152         }
2153
2154         return didProcessANewCommit;
2155 }
2156
2157 /**
2158  * Create the speculative table from transactions that are still live
2159  * and have come from the cloud
2160  */
2161 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2162         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2163                 // There is nothing to speculate on
2164                 return false;
2165         }
2166
2167         // Create a list of the transaction sequence numbers and sort them
2168         // from oldest to newest
2169         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2170         qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2171
2172         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2173
2174
2175         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2176                 // If there is a gap in the transaction sequence numbers then
2177                 // there was a commit or an abort of a transaction OR there was a
2178                 // new commit (Could be from offline commit) so a redo the
2179                 // speculation from scratch
2180
2181                 // Start from scratch
2182                 speculatedKeyValueTable->clear();
2183                 lastTransactionSequenceNumberSpeculatedOn = -1;
2184                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2185
2186         }
2187
2188         // Remember the front of the transaction list
2189         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2190
2191         // Find where to start arbitration from
2192         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2193
2194         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2195                 // Make sure we are not out of bounds
2196                 return false;           // did not speculate
2197         }
2198
2199         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2200         bool didSkip = true;
2201
2202         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2203                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2204                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2205
2206                 if (!transaction->isComplete()) {
2207                         // If there is an incomplete transaction then there is nothing
2208                         // we can do add this transactions arbitrator to the list of
2209                         // arbitrators we should ignore
2210                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2211                         didSkip = true;
2212                         continue;
2213                 }
2214
2215                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2216                         continue;
2217                 }
2218
2219                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2220
2221                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2222                         // Guard evaluated to true so update the speculative table
2223                         {
2224                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2225                                 while (kvit->hasNext()) {
2226                                         KeyValue *kv = kvit->next();
2227                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2228                                 }
2229                                 delete kvit;
2230                         }
2231                 }
2232         }
2233
2234         if (didSkip) {
2235                 // Since there was a skip we need to redo the speculation next time around
2236                 lastTransactionSequenceNumberSpeculatedOn = -1;
2237                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2238         }
2239
2240         // We did some speculation
2241         return true;
2242 }
2243
2244 /**
2245  * Create the pending transaction speculative table from transactions
2246  * that are still in the pending transaction buffer
2247  */
2248 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2249         if (pendingTransactionQueue->size() == 0) {
2250                 // There is nothing to speculate on
2251                 return;
2252         }
2253
2254         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2255                 // need to reset on the pending speculation
2256                 lastPendingTransactionSpeculatedOn = NULL;
2257                 firstPendingTransaction = pendingTransactionQueue->get(0);
2258                 pendingTransactionSpeculatedKeyValueTable->clear();
2259         }
2260
2261         // Find where to start arbitration from
2262         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2263
2264         if (startIndex >= pendingTransactionQueue->size()) {
2265                 // Make sure we are not out of bounds
2266                 return;
2267         }
2268
2269         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2270                 Transaction *transaction = pendingTransactionQueue->get(i);
2271
2272                 lastPendingTransactionSpeculatedOn = transaction;
2273
2274                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2275                         // Guard evaluated to true so update the speculative table
2276                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2277                         while (kvit->hasNext()) {
2278                                 KeyValue *kv = kvit->next();
2279                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2280                         }
2281                         delete kvit;
2282                 }
2283         }
2284 }
2285
2286 /**
2287  * Set dead and remove from the live transaction tables the
2288  * transactions that are dead
2289  */
2290 void Table::updateLiveTransactionsAndStatus() {
2291
2292         // Go through each of the transactions
2293         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2294                 Transaction *transaction = iter->next()->getValue();
2295
2296                 // Check if the transaction is dead
2297                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2298                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2299
2300                         // Set dead the transaction
2301                         transaction->setDead();
2302
2303                         // Remove the transaction from the live table
2304                         iter->remove();
2305                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2306                 }
2307         }
2308
2309         // Go through each of the transactions
2310         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2311                 TransactionStatus *status = iter->next()->getValue();
2312
2313                 // Check if the transaction is dead
2314                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2315                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2316
2317                         // Set committed
2318                         status->setStatus(TransactionStatus_StatusCommitted);
2319
2320                         // Remove
2321                         iter->remove();
2322                 }
2323         }
2324 }
2325
2326 /**
2327  * Process this slot, entry by entry->  Also update the latest message sent by slot
2328  */
2329 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2330
2331         // Update the last message seen
2332         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2333
2334         // Process each entry in the slot
2335         Vector<Entry *> *entries = slot->getEntries();
2336         uint eSize = entries->size();
2337         for(uint ei=0; ei < eSize; ei++) {
2338                 Entry * entry = entries->get(ei);
2339                 switch (entry->getType()) {
2340                 case TypeCommitPart:
2341                         processEntry((CommitPart *)entry);
2342                         break;
2343                 case TypeAbort:
2344                         processEntry((Abort *)entry);
2345                         break;
2346                 case TypeTransactionPart:
2347                         processEntry((TransactionPart *)entry);
2348                         break;
2349                 case TypeNewKey:
2350                         processEntry((NewKey *)entry);
2351                         break;
2352                 case TypeLastMessage:
2353                         processEntry((LastMessage *)entry, machineSet);
2354                         break;
2355                 case TypeRejectedMessage:
2356                         processEntry((RejectedMessage *)entry, indexer);
2357                         break;
2358                 case TypeTableStatus:
2359                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2360                         break;
2361                 default:
2362                         throw new Error("Unrecognized type: ");
2363                 }
2364         }
2365 }
2366
2367 /**
2368  * Update the last message that was sent for a machine Id
2369  */
2370 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2371         // Update what the last message received by a machine was
2372         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2373 }
2374
2375 /**
2376  * Add the new key to the arbitrators table and update the set of live
2377  * new keys (in case of a rescued new key message)
2378  */
2379 void Table::processEntry(NewKey *entry) {
2380         // Update the arbitrator table with the new key information
2381         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2382
2383         // Update what the latest live new key is
2384         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2385         if (oldNewKey != NULL) {
2386                 // Delete the old new key messages
2387                 oldNewKey->setDead();
2388         }
2389 }
2390
2391 /**
2392  * Process new table status entries and set dead the old ones as new
2393  * ones come in-> keeps track of the largest and smallest table status
2394  * seen in this current round of updating the local copy of the block
2395  * chain
2396  */
2397 void Table::processEntry(TableStatus * entry, int64_t seq) {
2398         int newNumSlots = entry->getMaxSlots();
2399         updateCurrMaxSize(newNumSlots);
2400         initExpectedSize(seq, newNumSlots);
2401
2402         if (liveTableStatus != NULL) {
2403                 // We have a larger table status so the old table status is no
2404                 // int64_ter alive
2405                 liveTableStatus->setDead();
2406         }
2407
2408         // Make this new table status the latest alive table status
2409         liveTableStatus = entry;
2410 }
2411
2412 /**
2413  * Check old messages to see if there is a block chain violation->
2414  * Also
2415  */
2416 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2417         int64_t oldSeqNum = entry->getOldSeqNum();
2418         int64_t newSeqNum = entry->getNewSeqNum();
2419         bool isequal = entry->getEqual();
2420         int64_t machineId = entry->getMachineID();
2421         int64_t seq = entry->getSequenceNumber();
2422
2423         // Check if we have messages that were supposed to be rejected in
2424         // our local block chain
2425         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2426                 // Get the slot
2427                 Slot *slot = indexer->getSlot(seqNum);
2428
2429                 if (slot != NULL) {
2430                         // If we have this slot make sure that it was not supposed to be
2431                         // a rejected slot
2432                         int64_t slotMachineId = slot->getMachineID();
2433                         if (isequal != (slotMachineId == machineId)) {
2434                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2435                         }
2436                 }
2437         }
2438
2439         // Create a list of clients to watch until they see this rejected
2440         // message entry->
2441         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2442         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2443                 // Machine ID for the last message entry
2444                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2445
2446                 // We've seen it, don't need to continue to watch->  Our next
2447                 // message will implicitly acknowledge it->
2448                 if (lastMessageEntryMachineId == localMachineId) {
2449                         continue;
2450                 }
2451
2452                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2453                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2454
2455                 if (entrySequenceNumber < seq) {
2456                         // Add this rejected message to the set of messages that this
2457                         // machine ID did not see yet
2458                         addWatchVector(lastMessageEntryMachineId, entry);
2459                         // This client did not see this rejected message yet so add it
2460                         // to the watch set to monitor
2461                         deviceWatchSet->add(lastMessageEntryMachineId);
2462                 }
2463         }
2464         if (deviceWatchSet->isEmpty()) {
2465                 // This rejected message has been seen by all the clients so
2466                 entry->setDead();
2467         } else {
2468                 // We need to watch this rejected message
2469                 entry->setWatchSet(deviceWatchSet);
2470         }
2471 }
2472
2473 /**
2474  * Check if this abort is live, if not then save it so we can kill it
2475  * later-> update the last transaction number that was arbitrated on->
2476  */
2477 void Table::processEntry(Abort *entry) {
2478         if (entry->getTransactionSequenceNumber() != -1) {
2479                 // update the transaction status if it was sent to the server
2480                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2481                 if (status != NULL) {
2482                         status->setStatus(TransactionStatus_StatusAborted);
2483                 }
2484         }
2485
2486         // Abort has not been seen by the client it is for yet so we need to
2487         // keep track of it
2488         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2489         if (previouslySeenAbort != NULL) {
2490                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2491         }
2492
2493         if (entry->getTransactionArbitrator() == localMachineId) {
2494                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2495         }
2496
2497         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2498                 // The machine already saw this so it is dead
2499                 entry->setDead();
2500                 liveAbortTable->remove(&entry->getAbortId());
2501
2502                 if (entry->getTransactionArbitrator() == localMachineId) {
2503                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2504                 }
2505                 return;
2506         }
2507
2508         // Update the last arbitration data that we have seen so far
2509         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2510                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2511                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2512                         // Is larger
2513                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2514                 }
2515         } else {
2516                 // Never seen any data from this arbitrator so record the first one
2517                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2518         }
2519
2520         // Set dead a transaction if we can
2521         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2522         if (transactionToSetDead != NULL) {
2523                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2524         }
2525
2526         // Update the last transaction sequence number that the arbitrator
2527         // arbitrated on
2528         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2529         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2530                 // Is a valid one
2531                 if (entry->getTransactionSequenceNumber() != -1) {
2532                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2533                 }
2534         }
2535 }
2536
2537 /**
2538  * Set dead the transaction part if that transaction is dead and keep
2539  * track of all new parts
2540  */
2541 void Table::processEntry(TransactionPart *entry) {
2542         // Check if we have already seen this transaction and set it dead OR
2543         // if it is not alive
2544         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2545         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2546                 // This transaction is dead, it was already committed or aborted
2547                 entry->setDead();
2548                 return;
2549         }
2550
2551         // This part is still alive
2552         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2553
2554         if (transactionPart == NULL) {
2555                 // Dont have a table for this machine Id yet so make one
2556                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2557                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2558         }
2559
2560         // Update the part and set dead ones we have already seen (got a
2561         // rescued version)
2562         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2563         if (previouslySeenPart != NULL) {
2564                 previouslySeenPart->setDead();
2565         }
2566 }
2567
2568 /**
2569  * Process new commit entries and save them for future use->  Delete duplicates
2570  */
2571 void Table::processEntry(CommitPart *entry) {
2572         // Update the last transaction that was updated if we can
2573         if (entry->getTransactionSequenceNumber() != -1) {
2574                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2575                 // Update the last transaction sequence number that the arbitrator
2576                 // arbitrated on
2577                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2578                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2579                 }
2580         }
2581
2582         Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2583         if (commitPart == NULL) {
2584                 // Don't have a table for this machine Id yet so make one
2585                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2586                 newCommitParts->put(entry->getMachineId(), commitPart);
2587         }
2588         // Update the part and set dead ones we have already seen (got a
2589         // rescued version)
2590         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2591         if (previouslySeenPart != NULL) {
2592                 previouslySeenPart->setDead();
2593         }
2594 }
2595
2596 /**
2597  * Update the last message seen table-> Update and set dead the
2598  * appropriate RejectedMessages as clients see them-> Updates the live
2599  * aborts, removes those that are dead and sets them dead-> Check that
2600  * the last message seen is correct and that there is no mismatch of
2601  * our own last message or that other clients have not had a rollback
2602  * on the last message->
2603  */
2604 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2605         // We have seen this machine ID
2606         machineSet->remove(machineId);
2607
2608         // Get the set of rejected messages that this machine Id is has not seen yet
2609         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2610         // If there is a rejected message that this machine Id has not seen yet
2611         if (watchset != NULL) {
2612                 // Go through each rejected message that this machine Id has not
2613                 // seen yet
2614
2615                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2616                 while(rmit->hasNext()) {
2617                         RejectedMessage *rm = rmit->next();
2618                         // If this machine Id has seen this rejected message->->->
2619                         if (rm->getSequenceNumber() <= seqNum) {
2620                                 // Remove it from our watchlist
2621                                 rmit->remove();
2622                                 // Decrement machines that need to see this notification
2623                                 rm->removeWatcher(machineId);
2624                         }
2625                 }
2626                 delete rmit;
2627         }
2628
2629         // Set dead the abort
2630         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2631                 Abort *abort = i->next()->getValue();
2632                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2633                         abort->setDead();
2634                         i->remove();
2635                         if (abort->getTransactionArbitrator() == localMachineId) {
2636                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2637                         }
2638                 }
2639         }
2640         if (machineId == localMachineId) {
2641                 // Our own messages are immediately dead->
2642                 char livenessType = liveness->getType();
2643                 if (livenessType==TypeLastMessage) {
2644                         ((LastMessage *)liveness)->setDead();
2645                 } else if (livenessType == TypeSlot) {
2646                         ((Slot *)liveness)->setDead();
2647                 } else {
2648                         throw new Error("Unrecognized type");
2649                 }
2650         }
2651         // Get the old last message for this device
2652         Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2653         if (lastMessageEntry == NULL) {
2654                 // If no last message then there is nothing else to process
2655                 return;
2656         }
2657
2658         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2659         Liveness *lastEntry = lastMessageEntry->getSecond();
2660         delete lastMessageEntry;
2661         
2662         // If it is not our machine Id since we already set ours to dead
2663         if (machineId != localMachineId) {
2664                 char lastEntryType = lastEntry->getType();
2665                 
2666                 if (lastEntryType == TypeLastMessage) {
2667                         ((LastMessage *)lastEntry)->setDead();
2668                 } else if (lastEntryType == TypeSlot) {
2669                         ((Slot *)lastEntry)->setDead();
2670                 } else {
2671                         throw new Error("Unrecognized type");
2672                 }
2673         }
2674         // Make sure the server is not playing any games
2675         if (machineId == localMachineId) {
2676                 if (hadPartialSendToServer) {
2677                         // We were not making any updates and we had a machine mismatch
2678                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2679                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2680                         }
2681                 } else {
2682                         // We were not making any updates and we had a machine mismatch
2683                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2684                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2685                         }
2686                 }
2687         } else {
2688                 if (lastMessageSeqNum > seqNum) {
2689                         throw new Error("Server Error: Rollback on remote machine sequence number");
2690                 }
2691         }
2692 }
2693
2694 /**
2695  * Add a rejected message entry to the watch set to keep track of
2696  * which clients have seen that rejected message entry and which have
2697  * not.
2698  */
2699 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2700         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2701         if (entries == NULL) {
2702                 // There is no set for this machine ID yet so create one
2703                 entries = new Hashset<RejectedMessage *>();
2704                 rejectedMessageWatchVectorTable->put(machineId, entries);
2705         }
2706         entries->add(entry);
2707 }
2708
2709 /**
2710  * Check if the HMAC chain is not violated
2711  */
2712 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2713         for (int i = 0; i < newSlots->length(); i++) {
2714                 Slot *currSlot = newSlots->get(i);
2715                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2716                 if (prevSlot != NULL &&
2717                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2718                         throw new Error("Server Error: Invalid HMAC Chain");
2719         }
2720 }