dce3a8f1969e4e960419b080ffe8ec17595c523a
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localTransactionSequenceNumber(0),
50         lastTransactionSequenceNumberSpeculatedOn(0),
51         oldestTransactionSequenceNumberSpeculatedOn(0),
52         localArbitrationSequenceNumber(0),
53         hadPartialSendToServer(false),
54         attemptedToSendToServer(false),
55         expectedsize(0),
56         didFindTableStatus(false),
57         currMaxSize(0),
58         lastSlotAttemptedToSend(NULL),
59         lastIsNewKey(false),
60         lastNewSize(0),
61         lastTransactionPartsSent(NULL),
62         lastPendingSendArbitrationEntriesToDelete(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localTransactionSequenceNumber(0),
112         lastTransactionSequenceNumberSpeculatedOn(0),
113         oldestTransactionSequenceNumberSpeculatedOn(0),
114         localArbitrationSequenceNumber(0),
115         hadPartialSendToServer(false),
116         attemptedToSendToServer(false),
117         expectedsize(0),
118         didFindTableStatus(false),
119         currMaxSize(0),
120         lastSlotAttemptedToSend(NULL),
121         lastIsNewKey(false),
122         lastNewSize(0),
123         lastTransactionPartsSent(NULL),
124         lastPendingSendArbitrationEntriesToDelete(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 /**
160  * Init all the stuff needed for for table usage
161  */
162 void Table::init() {
163         // Init helper objects
164         random = new SecureRandom();
165         buffer = new SlotBuffer();
166
167         // init data structs
168         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174         arbitratorTable = new Hashtable<IoTString *, int64_t>();
175         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184         rejectedSlotVector = new Vector<int64_t>();
185         pendingTransactionQueue = new Vector<Transaction *>();
186         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
195
196         // Other init stuff
197         numberOfSlots = buffer->capacity();
198         setResizeThreshold();
199 }
200
201 /**
202  * Initialize the table by inserting a table status as the first entry
203  * into the table status also initialize the crypto stuff.
204  */
205 void Table::initTable() {
206         cloud->initSecurity();
207
208         // Create the first insertion into the block chain which is the table status
209         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210         localSequenceNumber++;
211         TableStatus *status = new TableStatus(s, numberOfSlots);
212         s->addEntry(status);
213         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
214
215         if (array == NULL) {
216                 array = new Array<Slot *>(1);
217                 array->set(0, s);
218                 // update local block chain
219                 validateAndUpdate(array, true);
220         } else if (array->length() == 1) {
221                 // in case we did push the slot BUT we failed to init it
222                 validateAndUpdate(array, true);
223         } else {
224                 throw new Error("Error on initialization");
225         }
226 }
227
228 /**
229  * Rebuild the table from scratch by pulling the latest block chain
230  * from the server.
231  */
232 void Table::rebuild() {
233         // Just pull the latest slots from the server
234         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235         validateAndUpdate(newslots, true);
236         sendToServer(NULL);
237         updateLiveTransactionsAndStatus();
238 }
239
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
242 }
243
244 int64_t Table::getArbitrator(IoTString *key) {
245         return arbitratorTable->get(key);
246 }
247
248 void Table::close() {
249         cloud->closeCloud();
250 }
251
252 IoTString *Table::getCommitted(IoTString *key)  {
253         KeyValue *kv = committedKeyValueTable->get(key);
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getSpeculative(IoTString *key) {
263         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
264
265         if (kv == NULL) {
266                 kv = speculatedKeyValueTable->get(key);
267         }
268
269         if (kv == NULL) {
270                 kv = committedKeyValueTable->get(key);
271         }
272
273         if (kv != NULL) {
274                 return kv->getValue();
275         } else {
276                 return NULL;
277         }
278 }
279
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281         KeyValue *kv = committedKeyValueTable->get(key);
282
283         if (!arbitratorTable->contains(key)) {
284                 throw new Error("Key not Found.");
285         }
286
287         // Make sure new key value pair matches the current arbitrator
288         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289                 // TODO: Maybe not throw en error
290                 throw new Error("Not all Key Values Match Arbitrator.");
291         }
292
293         if (kv != NULL) {
294                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295                 return kv->getValue();
296         } else {
297                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
298                 return NULL;
299         }
300 }
301
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303         if (!arbitratorTable->contains(key)) {
304                 throw new Error("Key not Found.");
305         }
306
307         // Make sure new key value pair matches the current arbitrator
308         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309                 // TODO: Maybe not throw en error
310                 throw new Error("Not all Key Values Match Arbitrator.");
311         }
312
313         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
314
315         if (kv == NULL) {
316                 kv = speculatedKeyValueTable->get(key);
317         }
318
319         if (kv == NULL) {
320                 kv = committedKeyValueTable->get(key);
321         }
322
323         if (kv != NULL) {
324                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325                 return kv->getValue();
326         } else {
327                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
328                 return NULL;
329         }
330 }
331
332 bool Table::update()  {
333         try {
334                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335                 validateAndUpdate(newSlots, false);
336                 sendToServer(NULL);
337                 updateLiveTransactionsAndStatus();
338                 return true;
339         } catch (Exception *e) {
340                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341                 while (kit->hasNext()) {
342                         int64_t m = kit->next();
343                         updateFromLocal(m);
344                 }
345                 delete kit;
346         }
347
348         return false;
349 }
350
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
352         while (true) {
353                 if (!arbitratorTable->contains(keyName)) {
354                         // There is already an arbitrator
355                         return false;
356                 }
357                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
358
359                 if (sendToServer(newKey)) {
360                         // If successfully inserted
361                         return true;
362                 }
363         }
364 }
365
366 void Table::startTransaction() {
367         // Create a new transaction, invalidates any old pending transactions.
368         pendingTransactionBuilder = new PendingTransaction(localMachineId);
369 }
370
371 void Table::addKV(IoTString *key, IoTString *value) {
372
373         // Make sure it is a valid key
374         if (!arbitratorTable->contains(key)) {
375                 throw new Error("Key not Found.");
376         }
377
378         // Make sure new key value pair matches the current arbitrator
379         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380                 // TODO: Maybe not throw en error
381                 throw new Error("Not all Key Values Match Arbitrator.");
382         }
383
384         // Add the key value to this transaction
385         KeyValue *kv = new KeyValue(key, value);
386         pendingTransactionBuilder->addKV(kv);
387 }
388
389 TransactionStatus *Table::commitTransaction() {
390         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391                 // transaction with no updates will have no effect on the system
392                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
393         }
394
395         // Set the local transaction sequence number and increment
396         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397         localTransactionSequenceNumber++;
398
399         // Create the transaction status
400         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
401
402         // Create the new transaction
403         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404         newTransaction->setTransactionStatus(transactionStatus);
405
406         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407                 // Add it to the queue and invalidate the builder for safety
408                 pendingTransactionQueue->add(newTransaction);
409         } else {
410                 arbitrateOnLocalTransaction(newTransaction);
411                 updateLiveStateFromLocal();
412         }
413
414         pendingTransactionBuilder = new PendingTransaction(localMachineId);
415
416         try {
417                 sendToServer(NULL);
418         } catch (ServerException *e) {
419
420                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421                 uint size = pendingTransactionQueue->size();
422                 uint oldindex = 0;
423                 for (uint iter = 0; iter < size; iter++) {
424                         Transaction *transaction = pendingTransactionQueue->get(iter);
425                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
426
427                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428                                 // Already contacted this client so ignore all attempts to contact this client
429                                 // to preserve ordering for arbitrator
430                                 continue;
431                         }
432
433                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
434
435                         if (sendReturn.getFirst()) {
436                                 // Failed to contact over local
437                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
438                         } else {
439                                 // Successful contact or should not contact
440
441                                 if (sendReturn.getSecond()) {
442                                         // did arbitrate
443                                         oldindex--;
444                                 }
445                         }
446                 }
447                 pendingTransactionQueue->setSize(oldindex);
448         }
449
450         updateLiveStateFromLocal();
451
452         return transactionStatus;
453 }
454
455 /**
456  * Recalculate the new resize threshold
457  */
458 void Table::setResizeThreshold() {
459         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
461 }
462
463 int64_t Table::getLocalSequenceNumber() {
464         return localSequenceNumber;
465 }
466
467 bool Table::sendToServer(NewKey *newKey) {
468         bool fromRetry = false;
469         try {
470                 if (hadPartialSendToServer) {
471                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472                         if (newSlots->length() == 0) {
473                                 fromRetry = true;
474                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
475
476                                 if (sendSlotsReturn.getFirst()) {
477                                         if (newKey != NULL) {
478                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
479                                                         newKey = NULL;
480                                                 }
481                                         }
482
483                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484                                         while (trit->hasNext()) {
485                                                 Transaction *transaction = trit->next();
486                                                 transaction->resetServerFailure();
487                                                 // Update which transactions parts still need to be sent
488                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489                                                 // Add the transaction status to the outstanding list
490                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
491
492                                                 // Update the transaction status
493                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
494
495                                                 // Check if all the transaction parts were successfully
496                                                 // sent and if so then remove it from pending
497                                                 if (transaction->didSendAllParts()) {
498                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499                                                         pendingTransactionQueue->remove(transaction);
500                                                 }
501                                         }
502                                         delete trit;
503                                 } else {
504                                         newSlots = sendSlotsReturn.getThird();
505                                         bool isInserted = false;
506                                         for (uint si = 0; si < newSlots->length(); si++) {
507                                                 Slot *s = newSlots->get(si);
508                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
509                                                         isInserted = true;
510                                                         break;
511                                                 }
512                                         }
513
514                                         for (uint si = 0; si < newSlots->length(); si++) {
515                                                 Slot *s = newSlots->get(si);
516                                                 if (isInserted) {
517                                                         break;
518                                                 }
519
520                                                 // Process each entry in the slot
521                                                 Vector<Entry *> *ventries = s->getEntries();
522                                                 uint vesize = ventries->size();
523                                                 for (uint vei = 0; vei < vesize; vei++) {
524                                                         Entry *entry = ventries->get(vei);
525                                                         if (entry->getType() == TypeLastMessage) {
526                                                                 LastMessage *lastMessage = (LastMessage *)entry;
527                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
528                                                                         isInserted = true;
529                                                                         break;
530                                                                 }
531                                                         }
532                                                 }
533                                         }
534
535                                         if (isInserted) {
536                                                 if (newKey != NULL) {
537                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
538                                                                 newKey = NULL;
539                                                         }
540                                                 }
541
542                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543                                                 while (trit->hasNext()) {
544                                                         Transaction *transaction = trit->next();
545                                                         transaction->resetServerFailure();
546
547                                                         // Update which transactions parts still need to be sent
548                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
549
550                                                         // Add the transaction status to the outstanding list
551                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
552
553                                                         // Update the transaction status
554                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
555
556                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
557                                                         if (transaction->didSendAllParts()) {
558                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559                                                                 pendingTransactionQueue->remove(transaction);
560                                                         } else {
561                                                                 transaction->resetServerFailure();
562                                                                 // Set the transaction sequence number back to nothing
563                                                                 if (!transaction->didSendAPartToServer()) {
564                                                                         transaction->setSequenceNumber(-1);
565                                                                 }
566                                                         }
567                                                 }
568                                                 delete trit;
569                                         }
570                                 }
571
572                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573                                 while (trit->hasNext()) {
574                                         Transaction *transaction = trit->next();
575                                         transaction->resetServerFailure();
576                                         // Set the transaction sequence number back to nothing
577                                         if (!transaction->didSendAPartToServer()) {
578                                                 transaction->setSequenceNumber(-1);
579                                         }
580                                 }
581                                 delete trit;
582
583                                 if (sendSlotsReturn.getThird()->length() != 0) {
584                                         // insert into the local block chain
585                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
586                                 }
587                                 // continue;
588                         } else {
589                                 bool isInserted = false;
590                                 for (uint si = 0; si < newSlots->length(); si++) {
591                                         Slot *s = newSlots->get(si);
592                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
593                                                 isInserted = true;
594                                                 break;
595                                         }
596                                 }
597
598                                 for (uint si = 0; si < newSlots->length(); si++) {
599                                         Slot *s = newSlots->get(si);
600                                         if (isInserted) {
601                                                 break;
602                                         }
603
604                                         // Process each entry in the slot
605                                         Vector<Entry *> *entries = s->getEntries();
606                                         uint eSize = entries->size();
607                                         for (uint ei = 0; ei < eSize; ei++) {
608                                                 Entry *entry = entries->get(ei);
609
610                                                 if (entry->getType() == TypeLastMessage) {
611                                                         LastMessage *lastMessage = (LastMessage *)entry;
612                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
613                                                                 isInserted = true;
614                                                                 break;
615                                                         }
616                                                 }
617                                         }
618                                 }
619
620                                 if (isInserted) {
621                                         if (newKey != NULL) {
622                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
623                                                         newKey = NULL;
624                                                 }
625                                         }
626
627                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628                                         while (trit->hasNext()) {
629                                                 Transaction *transaction = trit->next();
630                                                 transaction->resetServerFailure();
631
632                                                 // Update which transactions parts still need to be sent
633                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
634
635                                                 // Add the transaction status to the outstanding list
636                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
637
638                                                 // Update the transaction status
639                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
640
641                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642                                                 if (transaction->didSendAllParts()) {
643                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644                                                         pendingTransactionQueue->remove(transaction);
645                                                 } else {
646                                                         transaction->resetServerFailure();
647                                                         // Set the transaction sequence number back to nothing
648                                                         if (!transaction->didSendAPartToServer()) {
649                                                                 transaction->setSequenceNumber(-1);
650                                                         }
651                                                 }
652                                         }
653                                         delete trit;
654                                 } else {
655                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656                                         while (trit->hasNext()) {
657                                                 Transaction *transaction = trit->next();
658                                                 transaction->resetServerFailure();
659                                                 // Set the transaction sequence number back to nothing
660                                                 if (!transaction->didSendAPartToServer()) {
661                                                         transaction->setSequenceNumber(-1);
662                                                 }
663                                         }
664                                         delete trit;
665                                 }
666
667                                 // insert into the local block chain
668                                 validateAndUpdate(newSlots, true);
669                         }
670                 }
671         } catch (ServerException *e) {
672                 throw e;
673         }
674
675
676
677         try {
678                 // While we have stuff that needs inserting into the block chain
679                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
680
681                         fromRetry = false;
682
683                         if (hadPartialSendToServer) {
684                                 throw new Error("Should Be error free");
685                         }
686
687
688
689                         // If there is a new key with same name then end
690                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
691                                 return false;
692                         }
693
694                         // Create the slot
695                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696                         localSequenceNumber++;
697
698                         // Try to fill the slot with data
699                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700                         bool needsResize = fillSlotsReturn.getFirst();
701                         int newSize = fillSlotsReturn.getSecond();
702                         bool insertedNewKey = fillSlotsReturn.getThird();
703
704                         if (needsResize) {
705                                 // Reset which transaction to send
706                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707                                 while (trit->hasNext()) {
708                                         Transaction *transaction = trit->next();
709                                         transaction->resetNextPartToSend();
710
711                                         // Set the transaction sequence number back to nothing
712                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713                                                 transaction->setSequenceNumber(-1);
714                                         }
715                                 }
716                                 delete trit;
717
718                                 // Clear the sent data since we are trying again
719                                 pendingSendArbitrationEntriesToDelete->clear();
720                                 transactionPartsSent->clear();
721
722                                 // We needed a resize so try again
723                                 fillSlot(slot, true, newKey);
724                         }
725
726                         lastSlotAttemptedToSend = slot;
727                         lastIsNewKey = (newKey != NULL);
728                         lastInsertedNewKey = insertedNewKey;
729                         lastNewSize = newSize;
730                         lastNewKey = newKey;
731                         lastTransactionPartsSent = transactionPartsSent->clone();
732                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
733
734                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
735
736                         if (sendSlotsReturn.getFirst()) {
737
738                                 // Did insert into the block chain
739
740                                 if (insertedNewKey) {
741                                         // This slot was what was inserted not a previous slot
742
743                                         // New Key was successfully inserted into the block chain so dont want to insert it again
744                                         newKey = NULL;
745                                 }
746
747                                 // Remove the aborts and commit parts that were sent from the pending to send queue
748                                 uint size = pendingSendArbitrationRounds->size();
749                                 uint oldcount = 0;
750                                 for (uint i = 0; i < size; i++) {
751                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
753
754                                         if (!round->isDoneSending()) {
755                                                 // Sent all the parts
756                                                 pendingSendArbitrationRounds->set(oldcount++,
757                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
758                                         }
759                                 }
760                                 pendingSendArbitrationRounds->setSize(oldcount);
761
762                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763                                 while (trit->hasNext()) {
764                                         Transaction *transaction = trit->next();
765                                         transaction->resetServerFailure();
766
767                                         // Update which transactions parts still need to be sent
768                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
769
770                                         // Add the transaction status to the outstanding list
771                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
772
773                                         // Update the transaction status
774                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
775
776                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
777                                         if (transaction->didSendAllParts()) {
778                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779                                                 pendingTransactionQueue->remove(transaction);
780                                         }
781                                 }
782                                 delete trit;
783                         } else {
784                                 // Reset which transaction to send
785                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786                                 while (trit->hasNext()) {
787                                         Transaction *transaction = trit->next();
788                                         transaction->resetNextPartToSend();
789
790                                         // Set the transaction sequence number back to nothing
791                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792                                                 transaction->setSequenceNumber(-1);
793                                         }
794                                 }
795                                 delete trit;
796                         }
797
798                         // Clear the sent data in preparation for next send
799                         pendingSendArbitrationEntriesToDelete->clear();
800                         transactionPartsSent->clear();
801
802                         if (sendSlotsReturn.getThird()->length() != 0) {
803                                 // insert into the local block chain
804                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
805                         }
806                 }
807
808         } catch (ServerException *e) {
809                 if (e->getType() != ServerException_TypeInputTimeout) {
810                         // Nothing was able to be sent to the server so just clear these data structures
811                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812                         while (trit->hasNext()) {
813                                 Transaction *transaction = trit->next();
814                                 transaction->resetNextPartToSend();
815
816                                 // Set the transaction sequence number back to nothing
817                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818                                         transaction->setSequenceNumber(-1);
819                                 }
820                         }
821                         delete trit;
822                 } else {
823                         // There was a partial send to the server
824                         hadPartialSendToServer = true;
825
826                         // Nothing was able to be sent to the server so just clear these data structures
827                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828                         while (trit->hasNext()) {
829                                 Transaction *transaction = trit->next();
830                                 transaction->resetNextPartToSend();
831                                 transaction->setServerFailure();
832                         }
833                         delete trit;
834                 }
835
836                 pendingSendArbitrationEntriesToDelete->clear();
837                 transactionPartsSent->clear();
838
839                 throw e;
840         }
841
842         return newKey == NULL;
843 }
844
845 bool Table::updateFromLocal(int64_t machineId) {
846         if (!localCommunicationTable->contains(machineId))
847                 return false;
848
849         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
850
851         // Get the size of the send data
852         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
853
854         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
857         }
858
859         Array<char> *sendData = new Array<char>(sendDataSize);
860         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
861
862         // Encode the data
863         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
864         bbEncode->putInt(0);
865
866         // Send by local
867         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868         localSequenceNumber++;
869
870         if (returnData == NULL) {
871                 // Could not contact server
872                 return false;
873         }
874
875         // Decode the data
876         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877         int numberOfEntries = bbDecode->getInt();
878
879         for (int i = 0; i < numberOfEntries; i++) {
880                 char type = bbDecode->get();
881                 if (type == TypeAbort) {
882                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
883                         processEntry(abort);
884                 } else if (type == TypeCommitPart) {
885                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886                         processEntry(commitPart);
887                 }
888         }
889
890         updateLiveStateFromLocal();
891
892         return true;
893 }
894
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
896
897         // Get the devices local communications
898         if (!localCommunicationTable->contains(transaction->getArbitrator()))
899                 return Pair<bool, bool>(true, false);
900
901         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
902
903         // Get the size of the send data
904         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
905         {
906                 Vector<TransactionPart *> *tParts = transaction->getParts();
907                 uint tPartsSize = tParts->size();
908                 for (uint i = 0; i < tPartsSize; i++) {
909                         TransactionPart *part = tParts->get(i);
910                         sendDataSize += part->getSize();
911                 }
912         }
913
914         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
917         }
918
919         // Make the send data size
920         Array<char> *sendData = new Array<char>(sendDataSize);
921         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
922
923         // Encode the data
924         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925         bbEncode->putInt(transaction->getParts()->size());
926         {
927                 Vector<TransactionPart *> *tParts = transaction->getParts();
928                 uint tPartsSize = tParts->size();
929                 for (uint i = 0; i < tPartsSize; i++) {
930                         TransactionPart *part = tParts->get(i);
931                         part->encode(bbEncode);
932                 }
933         }
934
935         // Send by local
936         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937         localSequenceNumber++;
938
939         if (returnData == NULL) {
940                 // Could not contact server
941                 return Pair<bool, bool>(true, false);
942         }
943
944         // Decode the data
945         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946         bool didCommit = bbDecode->get() == 1;
947         bool couldArbitrate = bbDecode->get() == 1;
948         int numberOfEntries = bbDecode->getInt();
949         bool foundAbort = false;
950
951         for (int i = 0; i < numberOfEntries; i++) {
952                 char type = bbDecode->get();
953                 if (type == TypeAbort) {
954                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
955
956                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
957                                 foundAbort = true;
958                         }
959
960                         processEntry(abort);
961                 } else if (type == TypeCommitPart) {
962                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963                         processEntry(commitPart);
964                 }
965         }
966
967         updateLiveStateFromLocal();
968
969         if (couldArbitrate) {
970                 TransactionStatus *status =  transaction->getTransactionStatus();
971                 if (didCommit) {
972                         status->setStatus(TransactionStatus_StatusCommitted);
973                 } else {
974                         status->setStatus(TransactionStatus_StatusAborted);
975                 }
976         } else {
977                 TransactionStatus *status =  transaction->getTransactionStatus();
978                 if (foundAbort) {
979                         status->setStatus(TransactionStatus_StatusAborted);
980                 } else {
981                         status->setStatus(TransactionStatus_StatusCommitted);
982                 }
983         }
984
985         return Pair<bool, bool>(false, true);
986 }
987
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
989         // Decode the data
990         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992         int numberOfParts = bbDecode->getInt();
993
994         // If we did commit a transaction or not
995         bool didCommit = false;
996         bool couldArbitrate = false;
997
998         if (numberOfParts != 0) {
999
1000                 // decode the transaction
1001                 Transaction *transaction = new Transaction();
1002                 for (int i = 0; i < numberOfParts; i++) {
1003                         bbDecode->get();
1004                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005                         transaction->addPartDecode(newPart);
1006                 }
1007
1008                 // Arbitrate on transaction and pull relevant return data
1009                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010                 couldArbitrate = localArbitrateReturn.getFirst();
1011                 didCommit = localArbitrateReturn.getSecond();
1012
1013                 updateLiveStateFromLocal();
1014
1015                 // Transaction was sent to the server so keep track of it to prevent double commit
1016                 if (transaction->getSequenceNumber() != -1) {
1017                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1018                 }
1019         }
1020
1021         // The data to send back
1022         int returnDataSize = 0;
1023         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1024
1025         // Get the aborts to send back
1026         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1027         {
1028                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029                 while (abortit->hasNext())
1030                         abortLocalSequenceNumbers->add(abortit->next());
1031                 delete abortit;
1032         }
1033
1034         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1035
1036         uint asize = abortLocalSequenceNumbers->size();
1037         for (uint i = 0; i < asize; i++) {
1038                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1040                         continue;
1041                 }
1042
1043                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044                 unseenArbitrations->add(abort);
1045                 returnDataSize += abort->getSize();
1046         }
1047
1048         // Get the commits to send back
1049         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050         if (commitForClientTable != NULL) {
1051                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1052                 {
1053                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054                         while (commitit->hasNext())
1055                                 commitLocalSequenceNumbers->add(commitit->next());
1056                         delete commitit;
1057                 }
1058                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1059
1060                 uint clsSize = commitLocalSequenceNumbers->size();
1061                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1062                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1064
1065                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1066                                 continue;
1067                         }
1068
1069                         {
1070                                 Vector<CommitPart *> *parts = commit->getParts();
1071                                 uint nParts = parts->size();
1072                                 for (uint i = 0; i < nParts; i++) {
1073                                         CommitPart *commitPart = parts->get(i);
1074                                         unseenArbitrations->add(commitPart);
1075                                         returnDataSize += commitPart->getSize();
1076                                 }
1077                         }
1078                 }
1079         }
1080
1081         // Number of arbitration entries to decode
1082         returnDataSize += 2 * sizeof(int32_t);
1083
1084         // bool of did commit or not
1085         if (numberOfParts != 0) {
1086                 returnDataSize += sizeof(char);
1087         }
1088
1089         // Data to send Back
1090         Array<char> *returnData = new Array<char>(returnDataSize);
1091         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1092
1093         if (numberOfParts != 0) {
1094                 if (didCommit) {
1095                         bbEncode->put((char)1);
1096                 } else {
1097                         bbEncode->put((char)0);
1098                 }
1099                 if (couldArbitrate) {
1100                         bbEncode->put((char)1);
1101                 } else {
1102                         bbEncode->put((char)0);
1103                 }
1104         }
1105
1106         bbEncode->putInt(unseenArbitrations->size());
1107         uint size = unseenArbitrations->size();
1108         for (uint i = 0; i < size; i++) {
1109                 Entry *entry = unseenArbitrations->get(i);
1110                 entry->encode(bbEncode);
1111         }
1112
1113         localSequenceNumber++;
1114         return returnData;
1115 }
1116
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119         attemptedToSendToServer = true;
1120
1121         bool inserted = false;
1122         bool lastTryInserted = false;
1123
1124         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125         if (array == NULL) {
1126                 array = new Array<Slot *>();
1127                 array->set(0, slot);
1128                 rejectedSlotVector->clear();
1129                 inserted = true;
1130         } else {
1131                 if (array->length() == 0) {
1132                         throw new Error("Server Error: Did not send any slots");
1133                 }
1134
1135                 // if (attemptedToSendToServerTmp) {
1136                 if (hadPartialSendToServer) {
1137
1138                         bool isInserted = false;
1139                         uint size = array->length();
1140                         for (uint i = 0; i < size; i++) {
1141                                 Slot *s = array->get(i);
1142                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1143                                         isInserted = true;
1144                                         break;
1145                                 }
1146                         }
1147
1148                         for (uint i = 0; i < size; i++) {
1149                                 Slot *s = array->get(i);
1150                                 if (isInserted) {
1151                                         break;
1152                                 }
1153
1154                                 // Process each entry in the slot
1155                                 Vector<Entry *> *entries = s->getEntries();
1156                                 uint eSize = entries->size();
1157                                 for (uint ei = 0; ei < eSize; ei++) {
1158                                         Entry *entry = entries->get(ei);
1159
1160                                         if (entry->getType() == TypeLastMessage) {
1161                                                 LastMessage *lastMessage = (LastMessage *)entry;
1162
1163                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1164                                                         isInserted = true;
1165                                                         break;
1166                                                 }
1167                                         }
1168                                 }
1169                         }
1170
1171                         if (!isInserted) {
1172                                 rejectedSlotVector->add(slot->getSequenceNumber());
1173                                 lastTryInserted = false;
1174                         } else {
1175                                 lastTryInserted = true;
1176                         }
1177                 } else {
1178                         rejectedSlotVector->add(slot->getSequenceNumber());
1179                         lastTryInserted = false;
1180                 }
1181         }
1182
1183         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1184 }
1185
1186 /**
1187  * Returns false if a resize was needed
1188  */
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1190         int newSize = 0;
1191         if (liveSlotCount > bufferResizeThreshold) {
1192                 resize = true;//Resize is forced
1193         }
1194
1195         if (resize) {
1196                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197                 TableStatus *status = new TableStatus(slot, newSize);
1198                 slot->addEntry(status);
1199         }
1200
1201         // Fill with rejected slots first before doing anything else
1202         doRejectedMessages(slot);
1203
1204         // Do mandatory rescue of entries
1205         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1206
1207         // Extract working variables
1208         bool needsResize = mandatoryRescueReturn.getFirst();
1209         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1211
1212         if (needsResize && !resize) {
1213                 // We need to resize but we are not resizing so return false
1214                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1215         }
1216
1217         bool inserted = false;
1218         if (newKeyEntry != NULL) {
1219                 newKeyEntry->setSlot(slot);
1220                 if (slot->hasSpace(newKeyEntry)) {
1221                         slot->addEntry(newKeyEntry);
1222                         inserted = true;
1223                 }
1224         }
1225
1226         // Clear the transactions, aborts and commits that were sent previously
1227         transactionPartsSent->clear();
1228         pendingSendArbitrationEntriesToDelete->clear();
1229         uint size = pendingSendArbitrationRounds->size();
1230         for (uint i = 0; i < size; i++) {
1231                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232                 bool isFull = false;
1233                 round->generateParts();
1234                 Vector<Entry *> *parts = round->getParts();
1235
1236                 // Insert pending arbitration data
1237                 uint vsize = parts->size();
1238                 for (uint vi = 0; vi < vsize; vi++) {
1239                         Entry *arbitrationData = parts->get(vi);
1240
1241                         // If it is an abort then we need to set some information
1242                         if (arbitrationData->getType() == TypeAbort) {
1243                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1244                         }
1245
1246                         if (!slot->hasSpace(arbitrationData)) {
1247                                 // No space so cant do anything else with these data entries
1248                                 isFull = true;
1249                                 break;
1250                         }
1251
1252                         // Add to this current slot and add it to entries to delete
1253                         slot->addEntry(arbitrationData);
1254                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1255                 }
1256
1257                 if (isFull) {
1258                         break;
1259                 }
1260         }
1261
1262         if (pendingTransactionQueue->size() > 0) {
1263                 Transaction *transaction = pendingTransactionQueue->get(0);
1264                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266                         transaction->setSequenceNumber(slot->getSequenceNumber());
1267                 }
1268
1269                 while (true) {
1270                         TransactionPart *part = transaction->getNextPartToSend();
1271                         if (part == NULL) {
1272                                 // Ran out of parts to send for this transaction so move on
1273                                 break;
1274                         }
1275
1276                         if (slot->hasSpace(part)) {
1277                                 slot->addEntry(part);
1278                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279                                 if (partsSent == NULL) {
1280                                         partsSent = new Vector<int32_t>();
1281                                         transactionPartsSent->put(transaction, partsSent);
1282                                 }
1283                                 partsSent->add(part->getPartNumber());
1284                                 transactionPartsSent->put(transaction, partsSent);
1285                         } else {
1286                                 break;
1287                         }
1288                 }
1289         }
1290
1291         // Fill the remainder of the slot with rescue data
1292         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1293
1294         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1295 }
1296
1297 void Table::doRejectedMessages(Slot *s) {
1298         if (!rejectedSlotVector->isEmpty()) {
1299                 /* TODO: We should avoid generating a rejected message entry if
1300                  * there is already a sufficient entry in the queue (e->g->,
1301                  * equalsto value of true and same sequence number)->  */
1302
1303                 int64_t old_seqn = rejectedSlotVector->get(0);
1304                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305                         int64_t new_seqn = rejectedSlotVector->lastElement();
1306                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1307                         s->addEntry(rm);
1308                 } else {
1309                         int64_t prev_seqn = -1;
1310                         uint i = 0;
1311                         /* Go through list of missing messages */
1312                         for (; i < rejectedSlotVector->size(); i++) {
1313                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1314                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1315                                 if (s_msg != NULL)
1316                                         break;
1317                                 prev_seqn = curr_seqn;
1318                         }
1319                         /* Generate rejected message entry for missing messages */
1320                         if (prev_seqn != -1) {
1321                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1322                                 s->addEntry(rm);
1323                         }
1324                         /* Generate rejected message entries for present messages */
1325                         for (; i < rejectedSlotVector->size(); i++) {
1326                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1327                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1328                                 int64_t machineid = s_msg->getMachineID();
1329                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1330                                 s->addEntry(rm);
1331                         }
1332                 }
1333         }
1334 }
1335
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1341         }
1342
1343         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344         bool seenLiveSlot = false;
1345         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1346         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1347
1348
1349         // Mandatory Rescue
1350         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1352                 // Push slot number forward
1353                 if (!seenLiveSlot) {
1354                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1355                 }
1356
1357                 if (!previousSlot->isLive()) {
1358                         continue;
1359                 }
1360
1361                 // We have seen a live slot
1362                 seenLiveSlot = true;
1363
1364                 // Get all the live entries for a slot
1365                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1366
1367                 // Iterate over all the live entries and try to rescue them
1368                 uint lESize = liveEntries->size();
1369                 for (uint i = 0; i < lESize; i++) {
1370                         Entry *liveEntry = liveEntries->get(i);
1371                         if (slot->hasSpace(liveEntry)) {
1372                                 // Enough space to rescue the entry
1373                                 slot->addEntry(liveEntry);
1374                         } else if (currentSequenceNumber == firstIfFull) {
1375                                 //if there's no space but the entry is about to fall off the queue
1376                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1377                         }
1378                 }
1379         }
1380
1381         // Did not resize
1382         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1383 }
1384
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386         /* now go through live entries from least to greatest sequence number until
1387          * either all live slots added, or the slot doesn't have enough room
1388          * for SKIP_THRESHOLD consecutive entries*/
1389         int skipcount = 0;
1390         int64_t newestseqnum = buffer->getNewestSeqNum();
1391         for (; seqn <= newestseqnum; seqn++) {
1392                 Slot *prevslot = buffer->getSlot(seqn);
1393                 //Push slot number forward
1394                 if (!seenliveslot)
1395                         oldestLiveSlotSequenceNumver = seqn;
1396
1397                 if (!prevslot->isLive())
1398                         continue;
1399                 seenliveslot = true;
1400                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1401                 uint lESize = liveentries->size();
1402                 for (uint i = 0; i < lESize; i++) {
1403                         Entry *liveentry = liveentries->get(i);
1404                         if (s->hasSpace(liveentry))
1405                                 s->addEntry(liveentry);
1406                         else {
1407                                 skipcount++;
1408                                 if (skipcount > Table_SKIP_THRESHOLD)
1409                                         goto donesearch;
1410                         }
1411                 }
1412         }
1413 donesearch:
1414         ;
1415 }
1416
1417 /**
1418  * Checks for malicious activity and updates the local copy of the block chain->
1419  */
1420 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1421         // The cloud communication layer has checked slot HMACs already
1422         // before decoding
1423         if (newSlots->length() == 0) {
1424                 return;
1425         }
1426
1427         // Make sure all slots are newer than the last largest slot this
1428         // client has seen
1429         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1430         if (firstSeqNum <= sequenceNumber) {
1431                 throw new Error("Server Error: Sent older slots!");
1432         }
1433
1434         // Create an object that can access both new slots and slots in our
1435         // local chain without committing slots to our local chain
1436         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1437
1438         // Check that the HMAC chain is not broken
1439         checkHMACChain(indexer, newSlots);
1440
1441         // Set to keep track of messages from clients
1442         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1443         {
1444                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1445                 while (lmit->hasNext())
1446                         machineSet->add(lmit->next());
1447                 delete lmit;
1448         }
1449
1450         // Process each slots data
1451         {
1452                 uint numSlots = newSlots->length();
1453                 for (uint i = 0; i < numSlots; i++) {
1454                         Slot *slot = newSlots->get(i);
1455                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1456                         updateExpectedSize();
1457                 }
1458         }
1459
1460         // If there is a gap, check to see if the server sent us
1461         // everything->
1462         if (firstSeqNum != (sequenceNumber + 1)) {
1463
1464                 // Check the size of the slots that were sent down by the server->
1465                 // Can only check the size if there was a gap
1466                 checkNumSlots(newSlots->length());
1467
1468                 // Since there was a gap every machine must have pushed a slot or
1469                 // must have a last message message-> If not then the server is
1470                 // hiding slots
1471                 if (!machineSet->isEmpty()) {
1472                         throw new Error("Missing record for machines: ");
1473                 }
1474         }
1475
1476         // Update the size of our local block chain->
1477         commitNewMaxSize();
1478
1479         // Commit new to slots to the local block chain->
1480         {
1481                 uint numSlots = newSlots->length();
1482                 for (uint i = 0; i < numSlots; i++) {
1483                         Slot *slot = newSlots->get(i);
1484
1485                         // Insert this slot into our local block chain copy->
1486                         buffer->putSlot(slot);
1487
1488                         // Keep track of how many slots are currently live (have live data
1489                         // in them)->
1490                         liveSlotCount++;
1491                 }
1492         }
1493         // Get the sequence number of the latest slot in the system
1494         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1495         updateLiveStateFromServer();
1496
1497         // No Need to remember after we pulled from the server
1498         offlineTransactionsCommittedAndAtServer->clear();
1499
1500         // This is invalidated now
1501         hadPartialSendToServer = false;
1502 }
1503
1504 void Table::updateLiveStateFromServer() {
1505         // Process the new transaction parts
1506         processNewTransactionParts();
1507
1508         // Do arbitration on new transactions that were received
1509         arbitrateFromServer();
1510
1511         // Update all the committed keys
1512         bool didCommitOrSpeculate = updateCommittedTable();
1513
1514         // Delete the transactions that are now dead
1515         updateLiveTransactionsAndStatus();
1516
1517         // Do speculations
1518         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1519         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1520 }
1521
1522 void Table::updateLiveStateFromLocal() {
1523         // Update all the committed keys
1524         bool didCommitOrSpeculate = updateCommittedTable();
1525
1526         // Delete the transactions that are now dead
1527         updateLiveTransactionsAndStatus();
1528
1529         // Do speculations
1530         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1531         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1532 }
1533
1534 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1535         int64_t prevslots = firstSequenceNumber;
1536
1537         if (didFindTableStatus) {
1538         } else {
1539                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1540         }
1541
1542         didFindTableStatus = true;
1543         currMaxSize = numberOfSlots;
1544 }
1545
1546 void Table::updateExpectedSize() {
1547         expectedsize++;
1548
1549         if (expectedsize > currMaxSize) {
1550                 expectedsize = currMaxSize;
1551         }
1552 }
1553
1554
1555 /**
1556  * Check the size of the block chain to make sure there are enough
1557  * slots sent back by the server-> This is only called when we have a
1558  * gap between the slots that we have locally and the slots sent by
1559  * the server therefore in the slots sent by the server there will be
1560  * at least 1 Table status message
1561  */
1562 void Table::checkNumSlots(int numberOfSlots) {
1563         if (numberOfSlots != expectedsize) {
1564                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1565         }
1566 }
1567
1568 /**
1569  * Update the size of of the local buffer if it is needed->
1570  */
1571 void Table::commitNewMaxSize() {
1572         didFindTableStatus = false;
1573
1574         // Resize the local slot buffer
1575         if (numberOfSlots != currMaxSize) {
1576                 buffer->resize((int32_t)currMaxSize);
1577         }
1578
1579         // Change the number of local slots to the new size
1580         numberOfSlots = (int32_t)currMaxSize;
1581
1582         // Recalculate the resize threshold since the size of the local
1583         // buffer has changed
1584         setResizeThreshold();
1585 }
1586
1587 /**
1588  * Process the new transaction parts from this latest round of slots
1589  * received from the server
1590  */
1591 void Table::processNewTransactionParts() {
1592
1593         if (newTransactionParts->size() == 0) {
1594                 // Nothing new to process
1595                 return;
1596         }
1597
1598         // Iterate through all the machine Ids that we received new parts
1599         // for
1600         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1601         while (tpit->hasNext()) {
1602                 int64_t machineId = tpit->next();
1603                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1604
1605                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1606                 // Iterate through all the parts for that machine Id
1607                 while (ptit->hasNext()) {
1608                         Pair<int64_t, int32_t> *partId = ptit->next();
1609                         TransactionPart *part = parts->get(partId);
1610
1611                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1612                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1613                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1614                                         // Set dead the transaction part
1615                                         part->setDead();
1616                                         continue;
1617                                 }
1618                         }
1619
1620                         // Get the transaction object for that sequence number
1621                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1622
1623                         if (transaction == NULL) {
1624                                 // This is a new transaction that we dont have so make a new one
1625                                 transaction = new Transaction();
1626
1627                                 // Insert this new transaction into the live tables
1628                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1629                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1630                         }
1631
1632                         // Add that part to the transaction
1633                         transaction->addPartDecode(part);
1634                 }
1635                 delete ptit;
1636         }
1637         delete tpit;
1638         // Clear all the new transaction parts in preparation for the next
1639         // time the server sends slots
1640         newTransactionParts->clear();
1641 }
1642
1643 void Table::arbitrateFromServer() {
1644
1645         if (liveTransactionBySequenceNumberTable->size() == 0) {
1646                 // Nothing to arbitrate on so move on
1647                 return;
1648         }
1649
1650         // Get the transaction sequence numbers and sort from oldest to newest
1651         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1652         {
1653                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1654                 while (trit->hasNext())
1655                         transactionSequenceNumbers->add(trit->next());
1656                 delete trit;
1657         }
1658         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1659
1660         // Collection of key value pairs that are
1661         Hashtable<IoTString *, KeyValue *> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1662
1663         // The last transaction arbitrated on
1664         int64_t lastTransactionCommitted = -1;
1665         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1666         uint tsnSize = transactionSequenceNumbers->size();
1667         for (uint i = 0; i < tsnSize; i++) {
1668                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1669                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1670
1671                 // Check if this machine arbitrates for this transaction if not
1672                 // then we cant arbitrate this transaction
1673                 if (transaction->getArbitrator() != localMachineId) {
1674                         continue;
1675                 }
1676
1677                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1678                         continue;
1679                 }
1680
1681                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1682                         // We have seen this already locally so dont commit again
1683                         continue;
1684                 }
1685
1686
1687                 if (!transaction->isComplete()) {
1688                         // Will arbitrate in incorrect order if we continue so just break
1689                         // Most likely this
1690                         break;
1691                 }
1692
1693
1694                 // update the largest transaction seen by arbitrator from server
1695                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1696                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1697                 } else {
1698                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1699                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1700                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1701                         }
1702                 }
1703
1704                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1705                         // Guard evaluated as true
1706
1707                         // Update the local changes so we can make the commit
1708                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1709                         while (kvit->hasNext()) {
1710                                 KeyValue *kv = kvit->next();
1711                                 speculativeTableTmp->put(kv->getKey(), kv);
1712                         }
1713                         delete kvit;
1714
1715                         // Update what the last transaction committed was for use in batch commit
1716                         lastTransactionCommitted = transactionSequenceNumber;
1717                 } else {
1718                         // Guard evaluated was false so create abort
1719                         // Create the abort
1720                         Abort *newAbort = new Abort(NULL,
1721                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1722                                                                                                                                         transaction->getSequenceNumber(),
1723                                                                                                                                         transaction->getMachineId(),
1724                                                                                                                                         transaction->getArbitrator(),
1725                                                                                                                                         localArbitrationSequenceNumber);
1726                         localArbitrationSequenceNumber++;
1727                         generatedAborts->add(newAbort);
1728
1729                         // Insert the abort so we can process
1730                         processEntry(newAbort);
1731                 }
1732
1733                 lastSeqNumArbOn = transactionSequenceNumber;
1734         }
1735
1736         Commit *newCommit = NULL;
1737
1738         // If there is something to commit
1739         if (speculativeTableTmp->size() != 0) {
1740                 // Create the commit and increment the commit sequence number
1741                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1742                 localArbitrationSequenceNumber++;
1743
1744                 // Add all the new keys to the commit
1745                 SetIterator<IoTString *, KeyValue *> *spit = getKeyIterator(speculativeTableTmp);
1746                 while (spit->hasNext()) {
1747                         IoTString *string = spit->next();
1748                         KeyValue *kv = speculativeTableTmp->get(string);
1749                         newCommit->addKV(kv);
1750                 }
1751                 delete spit;
1752
1753                 // create the commit parts
1754                 newCommit->createCommitParts();
1755
1756                 // Append all the commit parts to the end of the pending queue
1757                 // waiting for sending to the server
1758                 // Insert the commit so we can process it
1759                 Vector<CommitPart *> *parts = newCommit->getParts();
1760                 uint partsSize = parts->size();
1761                 for (uint i = 0; i < partsSize; i++) {
1762                         CommitPart *commitPart = parts->get(i);
1763                         processEntry(commitPart);
1764                 }
1765         }
1766
1767         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1768                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1769                 pendingSendArbitrationRounds->add(arbitrationRound);
1770
1771                 if (compactArbitrationData()) {
1772                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1773                         if (newArbitrationRound->getCommit() != NULL) {
1774                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1775                                 uint partsSize = parts->size();
1776                                 for (uint i = 0; i < partsSize; i++) {
1777                                         CommitPart *commitPart = parts->get(i);
1778                                         processEntry(commitPart);
1779                                 }
1780                         }
1781                 }
1782         }
1783 }
1784
1785 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1786
1787         // Check if this machine arbitrates for this transaction if not then
1788         // we cant arbitrate this transaction
1789         if (transaction->getArbitrator() != localMachineId) {
1790                 return Pair<bool, bool>(false, false);
1791         }
1792
1793         if (!transaction->isComplete()) {
1794                 // Will arbitrate in incorrect order if we continue so just break
1795                 // Most likely this
1796                 return Pair<bool, bool>(false, false);
1797         }
1798
1799         if (transaction->getMachineId() != localMachineId) {
1800                 // dont do this check for local transactions
1801                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1802                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1803                                 // We've have already seen this from the server
1804                                 return Pair<bool, bool>(false, false);
1805                         }
1806                 }
1807         }
1808
1809         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1810                 // Guard evaluated as true Create the commit and increment the
1811                 // commit sequence number
1812                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1813                 localArbitrationSequenceNumber++;
1814
1815                 // Update the local changes so we can make the commit
1816                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1817                 while (kvit->hasNext()) {
1818                         KeyValue *kv = kvit->next();
1819                         newCommit->addKV(kv);
1820                 }
1821                 delete kvit;
1822
1823                 // create the commit parts
1824                 newCommit->createCommitParts();
1825
1826                 // Append all the commit parts to the end of the pending queue
1827                 // waiting for sending to the server
1828                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1829                 pendingSendArbitrationRounds->add(arbitrationRound);
1830
1831                 if (compactArbitrationData()) {
1832                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1833                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1834                         uint partsSize = parts->size();
1835                         for (uint i = 0; i < partsSize; i++) {
1836                                 CommitPart *commitPart = parts->get(i);
1837                                 processEntry(commitPart);
1838                         }
1839                 } else {
1840                         // Insert the commit so we can process it
1841                         Vector<CommitPart *> *parts = newCommit->getParts();
1842                         uint partsSize = parts->size();
1843                         for (uint i = 0; i < partsSize; i++) {
1844                                 CommitPart *commitPart = parts->get(i);
1845                                 processEntry(commitPart);
1846                         }
1847                 }
1848
1849                 if (transaction->getMachineId() == localMachineId) {
1850                         TransactionStatus *status = transaction->getTransactionStatus();
1851                         if (status != NULL) {
1852                                 status->setStatus(TransactionStatus_StatusCommitted);
1853                         }
1854                 }
1855
1856                 updateLiveStateFromLocal();
1857                 return Pair<bool, bool>(true, true);
1858         } else {
1859                 if (transaction->getMachineId() == localMachineId) {
1860                         // For locally created messages update the status
1861                         // Guard evaluated was false so create abort
1862                         TransactionStatus *status = transaction->getTransactionStatus();
1863                         if (status != NULL) {
1864                                 status->setStatus(TransactionStatus_StatusAborted);
1865                         }
1866                 } else {
1867                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1868
1869                         // Create the abort
1870                         Abort *newAbort = new Abort(NULL,
1871                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1872                                                                                                                                         -1,
1873                                                                                                                                         transaction->getMachineId(),
1874                                                                                                                                         transaction->getArbitrator(),
1875                                                                                                                                         localArbitrationSequenceNumber);
1876                         localArbitrationSequenceNumber++;
1877                         addAbortSet->add(newAbort);
1878
1879                         // Append all the commit parts to the end of the pending queue
1880                         // waiting for sending to the server
1881                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1882                         pendingSendArbitrationRounds->add(arbitrationRound);
1883
1884                         if (compactArbitrationData()) {
1885                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1886
1887                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1888                                 uint partsSize = parts->size();
1889                                 for (uint i = 0; i < partsSize; i++) {
1890                                         CommitPart *commitPart = parts->get(i);
1891                                         processEntry(commitPart);
1892                                 }
1893                         }
1894                 }
1895
1896                 updateLiveStateFromLocal();
1897                 return Pair<bool, bool>(true, false);
1898         }
1899 }
1900
1901 /**
1902  * Compacts the arbitration data my merging commits and aggregating
1903  * aborts so that a single large push of commits can be done instead
1904  * of many small updates
1905  */
1906 bool Table::compactArbitrationData() {
1907         if (pendingSendArbitrationRounds->size() < 2) {
1908                 // Nothing to compact so do nothing
1909                 return false;
1910         }
1911
1912         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1913         if (lastRound->getDidSendPart()) {
1914                 return false;
1915         }
1916
1917         bool hadCommit = (lastRound->getCommit() == NULL);
1918         bool gotNewCommit = false;
1919
1920         uint numberToDelete = 1;
1921         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1922                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1923
1924                 if (round->isFull() || round->getDidSendPart()) {
1925                         // Stop since there is a part that cannot be compacted and we
1926                         // need to compact in order
1927                         break;
1928                 }
1929
1930                 if (round->getCommit() == NULL) {
1931                         // Try compacting aborts only
1932                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1933                         if (newSize > ArbitrationRound_MAX_PARTS) {
1934                                 // Cant compact since it would be too large
1935                                 break;
1936                         }
1937                         lastRound->addAborts(round->getAborts());
1938                 } else {
1939                         // Create a new larger commit
1940                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1941                         localArbitrationSequenceNumber++;
1942
1943                         // Create the commit parts so that we can count them
1944                         newCommit->createCommitParts();
1945
1946                         // Calculate the new size of the parts
1947                         int newSize = newCommit->getNumberOfParts();
1948                         newSize += lastRound->getAbortsCount();
1949                         newSize += round->getAbortsCount();
1950
1951                         if (newSize > ArbitrationRound_MAX_PARTS) {
1952                                 // Cant compact since it would be too large
1953                                 break;
1954                         }
1955
1956                         // Set the new compacted part
1957                         lastRound->setCommit(newCommit);
1958                         lastRound->addAborts(round->getAborts());
1959                         gotNewCommit = true;
1960                 }
1961
1962                 numberToDelete++;
1963         }
1964
1965         if (numberToDelete != 1) {
1966                 // If there is a compaction
1967                 // Delete the previous pieces that are now in the new compacted piece
1968                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1969                         pendingSendArbitrationRounds->clear();
1970                 } else {
1971                         for (uint i = 0; i < numberToDelete; i++) {
1972                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1973                         }
1974                 }
1975
1976                 // Add the new compacted into the pending to send list
1977                 pendingSendArbitrationRounds->add(lastRound);
1978
1979                 // Should reinsert into the commit processor
1980                 if (hadCommit && gotNewCommit) {
1981                         return true;
1982                 }
1983         }
1984
1985         return false;
1986 }
1987
1988 /**
1989  * Update all the commits and the committed tables, sets dead the dead
1990  * transactions
1991  */
1992 bool Table::updateCommittedTable() {
1993
1994         if (newCommitParts->size() == 0) {
1995                 // Nothing new to process
1996                 return false;
1997         }
1998
1999         // Iterate through all the machine Ids that we received new parts for
2000         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2001         while (partsit->hasNext()) {
2002                 int64_t machineId = partsit->next();
2003                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2004
2005                 // Iterate through all the parts for that machine Id
2006                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2007                 while (pairit->hasNext()) {
2008                         Pair<int64_t, int32_t> *partId = pairit->next();
2009                         CommitPart *part = parts->get(partId);
2010
2011                         // Get the transaction object for that sequence number
2012                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2013
2014                         if (commitForClientTable == NULL) {
2015                                 // This is the first commit from this device
2016                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2017                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2018                         }
2019
2020                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2021
2022                         if (commit == NULL) {
2023                                 // This is a new commit that we dont have so make a new one
2024                                 commit = new Commit();
2025
2026                                 // Insert this new commit into the live tables
2027                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2028                         }
2029
2030                         // Add that part to the commit
2031                         commit->addPartDecode(part);
2032                 }
2033                 delete pairit;
2034         }
2035         delete partsit;
2036
2037         // Clear all the new commits parts in preparation for the next time
2038         // the server sends slots
2039         newCommitParts->clear();
2040
2041         // If we process a new commit keep track of it for future use
2042         bool didProcessANewCommit = false;
2043
2044         // Process the commits one by one
2045         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2046         while (liveit->hasNext()) {
2047                 int64_t arbitratorId = liveit->next();
2048
2049                 // Get all the commits for a specific arbitrator
2050                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2051
2052                 // Sort the commits in order
2053                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2054                 {
2055                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2056                         while (clientit->hasNext())
2057                                 commitSequenceNumbers->add(clientit->next());
2058                         delete clientit;
2059                 }
2060
2061                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2062
2063                 // Get the last commit seen from this arbitrator
2064                 int64_t lastCommitSeenSequenceNumber = -1;
2065                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2066                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2067                 }
2068
2069                 // Go through each new commit one by one
2070                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2071                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2072                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2073
2074                         // Special processing if a commit is not complete
2075                         if (!commit->isComplete()) {
2076                                 if (i == (commitSequenceNumbers->size() - 1)) {
2077                                         // If there is an incomplete commit and this commit is the
2078                                         // latest one seen then this commit cannot be processed and
2079                                         // there are no other commits
2080                                         break;
2081                                 } else {
2082                                         // This is a commit that was already dead but parts of it
2083                                         // are still in the block chain (not flushed out yet)->
2084                                         // Delete it and move on
2085                                         commit->setDead();
2086                                         commitForClientTable->remove(commit->getSequenceNumber());
2087                                         continue;
2088                                 }
2089                         }
2090
2091                         // Update the last transaction that was updated if we can
2092                         if (commit->getTransactionSequenceNumber() != -1) {
2093                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2094                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2095                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2096                                 }
2097                         }
2098
2099                         // Update the last arbitration data that we have seen so far
2100                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2101                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2102                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2103                                         // Is larger
2104                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2105                                 }
2106                         } else {
2107                                 // Never seen any data from this arbitrator so record the first one
2108                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2109                         }
2110
2111                         // We have already seen this commit before so need to do the
2112                         // full processing on this commit
2113                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2114
2115                                 // Update the last transaction that was updated if we can
2116                                 if (commit->getTransactionSequenceNumber() != -1) {
2117                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2118                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2119                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2120                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2121                                         }
2122                                 }
2123
2124                                 continue;
2125                         }
2126
2127                         // If we got here then this is a brand new commit and needs full
2128                         // processing
2129                         // Get what commits should be edited, these are the commits that
2130                         // have live values for their keys
2131                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2132                         {
2133                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2134                                 while (kvit->hasNext()) {
2135                                         KeyValue *kv = kvit->next();
2136                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2137                                         if (commit != NULL)
2138                                                 commitsToEdit->add(commit);
2139                                 }
2140                                 delete kvit;
2141                         }
2142
2143                         // Update each previous commit that needs to be updated
2144                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2145                         while (commitit->hasNext()) {
2146                                 Commit *previousCommit = commitit->next();
2147
2148                                 // Only bother with live commits (TODO: Maybe remove this check)
2149                                 if (previousCommit->isLive()) {
2150
2151                                         // Update which keys in the old commits are still live
2152                                         {
2153                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2154                                                 while (kvit->hasNext()) {
2155                                                         KeyValue *kv = kvit->next();
2156                                                         previousCommit->invalidateKey(kv->getKey());
2157                                                 }
2158                                                 delete kvit;
2159                                         }
2160
2161                                         // if the commit is now dead then remove it
2162                                         if (!previousCommit->isLive()) {
2163                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2164                                         }
2165                                 }
2166                         }
2167                         delete commitit;
2168
2169                         // Update the last seen sequence number from this arbitrator
2170                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2171                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2172                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2173                                 }
2174                         } else {
2175                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2176                         }
2177
2178                         // We processed a new commit that we havent seen before
2179                         didProcessANewCommit = true;
2180
2181                         // Update the committed table of keys and which commit is using which key
2182                         {
2183                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2184                                 while (kvit->hasNext()) {
2185                                         KeyValue *kv = kvit->next();
2186                                         committedKeyValueTable->put(kv->getKey(), kv);
2187                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2188                                 }
2189                                 delete kvit;
2190                         }
2191                 }
2192         }
2193         delete liveit;
2194
2195         return didProcessANewCommit;
2196 }
2197
2198 /**
2199  * Create the speculative table from transactions that are still live
2200  * and have come from the cloud
2201  */
2202 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2203         if (liveTransactionBySequenceNumberTable->size() == 0) {
2204                 // There is nothing to speculate on
2205                 return false;
2206         }
2207
2208         // Create a list of the transaction sequence numbers and sort them
2209         // from oldest to newest
2210         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2211         {
2212                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2213                 while (trit->hasNext())
2214                         transactionSequenceNumbersSorted->add(trit->next());
2215                 delete trit;
2216         }
2217
2218         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2219
2220         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2221
2222
2223         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2224                 // If there is a gap in the transaction sequence numbers then
2225                 // there was a commit or an abort of a transaction OR there was a
2226                 // new commit (Could be from offline commit) so a redo the
2227                 // speculation from scratch
2228
2229                 // Start from scratch
2230                 speculatedKeyValueTable->clear();
2231                 lastTransactionSequenceNumberSpeculatedOn = -1;
2232                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2233         }
2234
2235         // Remember the front of the transaction list
2236         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2237
2238         // Find where to start arbitration from
2239         uint startIndex = 0;
2240
2241         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2242                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2243                         break;
2244         startIndex++;
2245
2246         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2247                 // Make sure we are not out of bounds
2248                 return false;           // did not speculate
2249         }
2250
2251         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2252         bool didSkip = true;
2253
2254         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2255                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2256                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2257
2258                 if (!transaction->isComplete()) {
2259                         // If there is an incomplete transaction then there is nothing
2260                         // we can do add this transactions arbitrator to the list of
2261                         // arbitrators we should ignore
2262                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2263                         didSkip = true;
2264                         continue;
2265                 }
2266
2267                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2268                         continue;
2269                 }
2270
2271                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2272
2273                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2274                         // Guard evaluated to true so update the speculative table
2275                         {
2276                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2277                                 while (kvit->hasNext()) {
2278                                         KeyValue *kv = kvit->next();
2279                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2280                                 }
2281                                 delete kvit;
2282                         }
2283                 }
2284         }
2285
2286         if (didSkip) {
2287                 // Since there was a skip we need to redo the speculation next time around
2288                 lastTransactionSequenceNumberSpeculatedOn = -1;
2289                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2290         }
2291
2292         // We did some speculation
2293         return true;
2294 }
2295
2296 /**
2297  * Create the pending transaction speculative table from transactions
2298  * that are still in the pending transaction buffer
2299  */
2300 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2301         if (pendingTransactionQueue->size() == 0) {
2302                 // There is nothing to speculate on
2303                 return;
2304         }
2305
2306         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2307                 // need to reset on the pending speculation
2308                 lastPendingTransactionSpeculatedOn = NULL;
2309                 firstPendingTransaction = pendingTransactionQueue->get(0);
2310                 pendingTransactionSpeculatedKeyValueTable->clear();
2311         }
2312
2313         // Find where to start arbitration from
2314         uint startIndex = 0;
2315
2316         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2317                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2318                         break;
2319
2320         if (startIndex >= pendingTransactionQueue->size()) {
2321                 // Make sure we are not out of bounds
2322                 return;
2323         }
2324
2325         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2326                 Transaction *transaction = pendingTransactionQueue->get(i);
2327
2328                 lastPendingTransactionSpeculatedOn = transaction;
2329
2330                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2331                         // Guard evaluated to true so update the speculative table
2332                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2333                         while (kvit->hasNext()) {
2334                                 KeyValue *kv = kvit->next();
2335                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2336                         }
2337                         delete kvit;
2338                 }
2339         }
2340 }
2341
2342 /**
2343  * Set dead and remove from the live transaction tables the
2344  * transactions that are dead
2345  */
2346 void Table::updateLiveTransactionsAndStatus() {
2347         // Go through each of the transactions
2348         {
2349                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2350                 while (iter->hasNext()) {
2351                         int64_t key = iter->next();
2352                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2353
2354                         // Check if the transaction is dead
2355                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2356                                 // Set dead the transaction
2357                                 transaction->setDead();
2358
2359                                 // Remove the transaction from the live table
2360                                 iter->remove();
2361                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2362                         }
2363                 }
2364                 delete iter;
2365         }
2366
2367         // Go through each of the transactions
2368         {
2369                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2370                 while (iter->hasNext()) {
2371                         int64_t key = iter->next();
2372                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2373
2374                         // Check if the transaction is dead
2375                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2376
2377                                 // Set committed
2378                                 status->setStatus(TransactionStatus_StatusCommitted);
2379
2380                                 // Remove
2381                                 iter->remove();
2382                         }
2383                 }
2384                 delete iter;
2385         }
2386 }
2387
2388 /**
2389  * Process this slot, entry by entry->  Also update the latest message sent by slot
2390  */
2391 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2392
2393         // Update the last message seen
2394         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2395
2396         // Process each entry in the slot
2397         Vector<Entry *> *entries = slot->getEntries();
2398         uint eSize = entries->size();
2399         for (uint ei = 0; ei < eSize; ei++) {
2400                 Entry *entry = entries->get(ei);
2401                 switch (entry->getType()) {
2402                 case TypeCommitPart:
2403                         processEntry((CommitPart *)entry);
2404                         break;
2405                 case TypeAbort:
2406                         processEntry((Abort *)entry);
2407                         break;
2408                 case TypeTransactionPart:
2409                         processEntry((TransactionPart *)entry);
2410                         break;
2411                 case TypeNewKey:
2412                         processEntry((NewKey *)entry);
2413                         break;
2414                 case TypeLastMessage:
2415                         processEntry((LastMessage *)entry, machineSet);
2416                         break;
2417                 case TypeRejectedMessage:
2418                         processEntry((RejectedMessage *)entry, indexer);
2419                         break;
2420                 case TypeTableStatus:
2421                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2422                         break;
2423                 default:
2424                         throw new Error("Unrecognized type: ");
2425                 }
2426         }
2427 }
2428
2429 /**
2430  * Update the last message that was sent for a machine Id
2431  */
2432 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2433         // Update what the last message received by a machine was
2434         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2435 }
2436
2437 /**
2438  * Add the new key to the arbitrators table and update the set of live
2439  * new keys (in case of a rescued new key message)
2440  */
2441 void Table::processEntry(NewKey *entry) {
2442         // Update the arbitrator table with the new key information
2443         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2444
2445         // Update what the latest live new key is
2446         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2447         if (oldNewKey != NULL) {
2448                 // Delete the old new key messages
2449                 oldNewKey->setDead();
2450         }
2451 }
2452
2453 /**
2454  * Process new table status entries and set dead the old ones as new
2455  * ones come in-> keeps track of the largest and smallest table status
2456  * seen in this current round of updating the local copy of the block
2457  * chain
2458  */
2459 void Table::processEntry(TableStatus *entry, int64_t seq) {
2460         int newNumSlots = entry->getMaxSlots();
2461         updateCurrMaxSize(newNumSlots);
2462         initExpectedSize(seq, newNumSlots);
2463
2464         if (liveTableStatus != NULL) {
2465                 // We have a larger table status so the old table status is no
2466                 // int64_ter alive
2467                 liveTableStatus->setDead();
2468         }
2469
2470         // Make this new table status the latest alive table status
2471         liveTableStatus = entry;
2472 }
2473
2474 /**
2475  * Check old messages to see if there is a block chain violation->
2476  * Also
2477  */
2478 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2479         int64_t oldSeqNum = entry->getOldSeqNum();
2480         int64_t newSeqNum = entry->getNewSeqNum();
2481         bool isequal = entry->getEqual();
2482         int64_t machineId = entry->getMachineID();
2483         int64_t seq = entry->getSequenceNumber();
2484
2485         // Check if we have messages that were supposed to be rejected in
2486         // our local block chain
2487         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2488                 // Get the slot
2489                 Slot *slot = indexer->getSlot(seqNum);
2490
2491                 if (slot != NULL) {
2492                         // If we have this slot make sure that it was not supposed to be
2493                         // a rejected slot
2494                         int64_t slotMachineId = slot->getMachineID();
2495                         if (isequal != (slotMachineId == machineId)) {
2496                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2497                         }
2498                 }
2499         }
2500
2501         // Create a list of clients to watch until they see this rejected
2502         // message entry->
2503         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2504         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2505         while (iter->hasNext()) {
2506                 // Machine ID for the last message entry
2507                 int64_t lastMessageEntryMachineId = iter->next();
2508
2509                 // We've seen it, don't need to continue to watch->  Our next
2510                 // message will implicitly acknowledge it->
2511                 if (lastMessageEntryMachineId == localMachineId) {
2512                         continue;
2513                 }
2514
2515                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2516                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2517
2518                 if (entrySequenceNumber < seq) {
2519                         // Add this rejected message to the set of messages that this
2520                         // machine ID did not see yet
2521                         addWatchVector(lastMessageEntryMachineId, entry);
2522                         // This client did not see this rejected message yet so add it
2523                         // to the watch set to monitor
2524                         deviceWatchSet->add(lastMessageEntryMachineId);
2525                 }
2526         }
2527         delete iter;
2528
2529         if (deviceWatchSet->isEmpty()) {
2530                 // This rejected message has been seen by all the clients so
2531                 entry->setDead();
2532         } else {
2533                 // We need to watch this rejected message
2534                 entry->setWatchSet(deviceWatchSet);
2535         }
2536 }
2537
2538 /**
2539  * Check if this abort is live, if not then save it so we can kill it
2540  * later-> update the last transaction number that was arbitrated on->
2541  */
2542 void Table::processEntry(Abort *entry) {
2543         if (entry->getTransactionSequenceNumber() != -1) {
2544                 // update the transaction status if it was sent to the server
2545                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2546                 if (status != NULL) {
2547                         status->setStatus(TransactionStatus_StatusAborted);
2548                 }
2549         }
2550
2551         // Abort has not been seen by the client it is for yet so we need to
2552         // keep track of it
2553
2554         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2555         if (previouslySeenAbort != NULL) {
2556                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2557         }
2558
2559         if (entry->getTransactionArbitrator() == localMachineId) {
2560                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2561         }
2562
2563         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2564                 // The machine already saw this so it is dead
2565                 entry->setDead();
2566                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2567                 liveAbortTable->remove(&abortid);
2568
2569                 if (entry->getTransactionArbitrator() == localMachineId) {
2570                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2571                 }
2572                 return;
2573         }
2574
2575         // Update the last arbitration data that we have seen so far
2576         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2577                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2578                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2579                         // Is larger
2580                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2581                 }
2582         } else {
2583                 // Never seen any data from this arbitrator so record the first one
2584                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2585         }
2586
2587         // Set dead a transaction if we can
2588         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2589
2590         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2591         if (transactionToSetDead != NULL) {
2592                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2593         }
2594
2595         // Update the last transaction sequence number that the arbitrator
2596         // arbitrated on
2597         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2598                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2599                 // Is a valid one
2600                 if (entry->getTransactionSequenceNumber() != -1) {
2601                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2602                 }
2603         }
2604 }
2605
2606 /**
2607  * Set dead the transaction part if that transaction is dead and keep
2608  * track of all new parts
2609  */
2610 void Table::processEntry(TransactionPart *entry) {
2611         // Check if we have already seen this transaction and set it dead OR
2612         // if it is not alive
2613         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2614                 // This transaction is dead, it was already committed or aborted
2615                 entry->setDead();
2616                 return;
2617         }
2618
2619         // This part is still alive
2620         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2621
2622         if (transactionPart == NULL) {
2623                 // Dont have a table for this machine Id yet so make one
2624                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2625                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2626         }
2627
2628         // Update the part and set dead ones we have already seen (got a
2629         // rescued version)
2630         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2631         if (previouslySeenPart != NULL) {
2632                 previouslySeenPart->setDead();
2633         }
2634 }
2635
2636 /**
2637  * Process new commit entries and save them for future use->  Delete duplicates
2638  */
2639 void Table::processEntry(CommitPart *entry) {
2640         // Update the last transaction that was updated if we can
2641         if (entry->getTransactionSequenceNumber() != -1) {
2642                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2643                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2644                 }
2645         }
2646
2647         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2648         if (commitPart == NULL) {
2649                 // Don't have a table for this machine Id yet so make one
2650                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2651                 newCommitParts->put(entry->getMachineId(), commitPart);
2652         }
2653         // Update the part and set dead ones we have already seen (got a
2654         // rescued version)
2655         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2656         if (previouslySeenPart != NULL) {
2657                 previouslySeenPart->setDead();
2658         }
2659 }
2660
2661 /**
2662  * Update the last message seen table-> Update and set dead the
2663  * appropriate RejectedMessages as clients see them-> Updates the live
2664  * aborts, removes those that are dead and sets them dead-> Check that
2665  * the last message seen is correct and that there is no mismatch of
2666  * our own last message or that other clients have not had a rollback
2667  * on the last message->
2668  */
2669 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2670         // We have seen this machine ID
2671         machineSet->remove(machineId);
2672
2673         // Get the set of rejected messages that this machine Id is has not seen yet
2674         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2675         // If there is a rejected message that this machine Id has not seen yet
2676         if (watchset != NULL) {
2677                 // Go through each rejected message that this machine Id has not
2678                 // seen yet
2679
2680                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2681                 while (rmit->hasNext()) {
2682                         RejectedMessage *rm = rmit->next();
2683                         // If this machine Id has seen this rejected message->->->
2684                         if (rm->getSequenceNumber() <= seqNum) {
2685                                 // Remove it from our watchlist
2686                                 rmit->remove();
2687                                 // Decrement machines that need to see this notification
2688                                 rm->removeWatcher(machineId);
2689                         }
2690                 }
2691                 delete rmit;
2692         }
2693
2694         // Set dead the abort
2695         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2696
2697         while (abortit->hasNext()) {
2698                 Pair<int64_t, int64_t> *key = abortit->next();
2699                 Abort *abort = liveAbortTable->get(key);
2700                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2701                         abort->setDead();
2702                         abortit->remove();
2703                         if (abort->getTransactionArbitrator() == localMachineId) {
2704                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2705                         }
2706                 }
2707         }
2708         delete abortit;
2709         if (machineId == localMachineId) {
2710                 // Our own messages are immediately dead->
2711                 char livenessType = liveness->getType();
2712                 if (livenessType == TypeLastMessage) {
2713                         ((LastMessage *)liveness)->setDead();
2714                 } else if (livenessType == TypeSlot) {
2715                         ((Slot *)liveness)->setDead();
2716                 } else {
2717                         throw new Error("Unrecognized type");
2718                 }
2719         }
2720         // Get the old last message for this device
2721         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2722         if (lastMessageEntry == NULL) {
2723                 // If no last message then there is nothing else to process
2724                 return;
2725         }
2726
2727         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2728         Liveness *lastEntry = lastMessageEntry->getSecond();
2729         delete lastMessageEntry;
2730
2731         // If it is not our machine Id since we already set ours to dead
2732         if (machineId != localMachineId) {
2733                 char lastEntryType = lastEntry->getType();
2734
2735                 if (lastEntryType == TypeLastMessage) {
2736                         ((LastMessage *)lastEntry)->setDead();
2737                 } else if (lastEntryType == TypeSlot) {
2738                         ((Slot *)lastEntry)->setDead();
2739                 } else {
2740                         throw new Error("Unrecognized type");
2741                 }
2742         }
2743         // Make sure the server is not playing any games
2744         if (machineId == localMachineId) {
2745                 if (hadPartialSendToServer) {
2746                         // We were not making any updates and we had a machine mismatch
2747                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2748                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2749                         }
2750                 } else {
2751                         // We were not making any updates and we had a machine mismatch
2752                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2753                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2754                         }
2755                 }
2756         } else {
2757                 if (lastMessageSeqNum > seqNum) {
2758                         throw new Error("Server Error: Rollback on remote machine sequence number");
2759                 }
2760         }
2761 }
2762
2763 /**
2764  * Add a rejected message entry to the watch set to keep track of
2765  * which clients have seen that rejected message entry and which have
2766  * not.
2767  */
2768 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2769         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2770         if (entries == NULL) {
2771                 // There is no set for this machine ID yet so create one
2772                 entries = new Hashset<RejectedMessage *>();
2773                 rejectedMessageWatchVectorTable->put(machineId, entries);
2774         }
2775         entries->add(entry);
2776 }
2777
2778 /**
2779  * Check if the HMAC chain is not violated
2780  */
2781 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2782         for (uint i = 0; i < newSlots->length(); i++) {
2783                 Slot *currSlot = newSlots->get(i);
2784                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2785                 if (prevSlot != NULL &&
2786                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2787                         throw new Error("Server Error: Invalid HMAC Chain");
2788         }
2789 }