edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localTransactionSequenceNumber(0),
50         lastTransactionSequenceNumberSpeculatedOn(0),
51         oldestTransactionSequenceNumberSpeculatedOn(0),
52         localArbitrationSequenceNumber(0),
53         hadPartialSendToServer(false),
54         attemptedToSendToServer(false),
55         expectedsize(0),
56         didFindTableStatus(false),
57         currMaxSize(0),
58         lastSlotAttemptedToSend(NULL),
59         lastIsNewKey(false),
60         lastNewSize(0),
61         lastTransactionPartsSent(NULL),
62         lastPendingSendArbitrationEntriesToDelete(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localTransactionSequenceNumber(0),
112         lastTransactionSequenceNumberSpeculatedOn(0),
113         oldestTransactionSequenceNumberSpeculatedOn(0),
114         localArbitrationSequenceNumber(0),
115         hadPartialSendToServer(false),
116         attemptedToSendToServer(false),
117         expectedsize(0),
118         didFindTableStatus(false),
119         currMaxSize(0),
120         lastSlotAttemptedToSend(NULL),
121         lastIsNewKey(false),
122         lastNewSize(0),
123         lastTransactionPartsSent(NULL),
124         lastPendingSendArbitrationEntriesToDelete(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 Table::~Table() {
160         delete cloud;
161         delete random;
162         delete buffer;
163         // init data structs
164         delete committedKeyValueTable;
165         delete speculatedKeyValueTable;
166         delete pendingTransactionSpeculatedKeyValueTable;
167         delete liveNewKeyTable;
168         delete lastMessageTable;
169         delete rejectedMessageWatchVectorTable;
170         delete arbitratorTable;
171         delete liveAbortTable;
172         delete newTransactionParts;
173         delete newCommitParts;
174         delete lastArbitratedTransactionNumberByArbitratorTable;
175         delete liveTransactionBySequenceNumberTable;
176         delete liveTransactionByTransactionIdTable;
177         delete liveCommitsTable;
178         delete liveCommitsByKeyTable;
179         delete lastCommitSeenSequenceNumberByArbitratorTable;
180         delete rejectedSlotVector;
181         delete pendingTransactionQueue;
182         delete pendingSendArbitrationEntriesToDelete;
183         delete transactionPartsSent;
184         delete outstandingTransactionStatus;
185         delete liveAbortsGeneratedByLocal;
186         delete offlineTransactionsCommittedAndAtServer;
187         delete localCommunicationTable;
188         delete lastTransactionSeenFromMachineFromServer;
189         delete pendingSendArbitrationRounds;
190         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
191 }
192
193 /**
194  * Init all the stuff needed for for table usage
195  */
196 void Table::init() {
197         // Init helper objects
198         random = new SecureRandom();
199         buffer = new SlotBuffer();
200
201         // init data structs
202         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
203         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
204         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
205         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
206         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
207         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
208         arbitratorTable = new Hashtable<IoTString *, int64_t>();
209         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
210         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
211         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
212         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
213         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
214         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
215         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
216         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
217         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
218         rejectedSlotVector = new Vector<int64_t>();
219         pendingTransactionQueue = new Vector<Transaction *>();
220         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
221         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
222         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
223         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
224         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
225         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
226         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
227         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
228         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
229
230         // Other init stuff
231         numberOfSlots = buffer->capacity();
232         setResizeThreshold();
233 }
234
235 /**
236  * Initialize the table by inserting a table status as the first entry
237  * into the table status also initialize the crypto stuff.
238  */
239 void Table::initTable() {
240         cloud->initSecurity();
241
242         // Create the first insertion into the block chain which is the table status
243         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
244         localSequenceNumber++;
245         TableStatus *status = new TableStatus(s, numberOfSlots);
246         s->addEntry(status);
247         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
248
249         if (array == NULL) {
250                 array = new Array<Slot *>(1);
251                 array->set(0, s);
252                 // update local block chain
253                 validateAndUpdate(array, true);
254         } else if (array->length() == 1) {
255                 // in case we did push the slot BUT we failed to init it
256                 validateAndUpdate(array, true);
257         } else {
258                 throw new Error("Error on initialization");
259         }
260 }
261
262 /**
263  * Rebuild the table from scratch by pulling the latest block chain
264  * from the server.
265  */
266 void Table::rebuild() {
267         // Just pull the latest slots from the server
268         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
269         validateAndUpdate(newslots, true);
270         sendToServer(NULL);
271         updateLiveTransactionsAndStatus();
272 }
273
274 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
275         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
276 }
277
278 int64_t Table::getArbitrator(IoTString *key) {
279         return arbitratorTable->get(key);
280 }
281
282 void Table::close() {
283         cloud->closeCloud();
284 }
285
286 IoTString *Table::getCommitted(IoTString *key)  {
287         KeyValue *kv = committedKeyValueTable->get(key);
288
289         if (kv != NULL) {
290                 return kv->getValue();
291         } else {
292                 return NULL;
293         }
294 }
295
296 IoTString *Table::getSpeculative(IoTString *key) {
297         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298
299         if (kv == NULL) {
300                 kv = speculatedKeyValueTable->get(key);
301         }
302
303         if (kv == NULL) {
304                 kv = committedKeyValueTable->get(key);
305         }
306
307         if (kv != NULL) {
308                 return kv->getValue();
309         } else {
310                 return NULL;
311         }
312 }
313
314 IoTString *Table::getCommittedAtomic(IoTString *key) {
315         KeyValue *kv = committedKeyValueTable->get(key);
316
317         if (!arbitratorTable->contains(key)) {
318                 throw new Error("Key not Found.");
319         }
320
321         // Make sure new key value pair matches the current arbitrator
322         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
323                 // TODO: Maybe not throw en error
324                 throw new Error("Not all Key Values Match Arbitrator.");
325         }
326
327         if (kv != NULL) {
328                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
329                 return kv->getValue();
330         } else {
331                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
332                 return NULL;
333         }
334 }
335
336 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
337         if (!arbitratorTable->contains(key)) {
338                 throw new Error("Key not Found.");
339         }
340
341         // Make sure new key value pair matches the current arbitrator
342         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
343                 // TODO: Maybe not throw en error
344                 throw new Error("Not all Key Values Match Arbitrator.");
345         }
346
347         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
348
349         if (kv == NULL) {
350                 kv = speculatedKeyValueTable->get(key);
351         }
352
353         if (kv == NULL) {
354                 kv = committedKeyValueTable->get(key);
355         }
356
357         if (kv != NULL) {
358                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
359                 return kv->getValue();
360         } else {
361                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
362                 return NULL;
363         }
364 }
365
366 bool Table::update()  {
367         try {
368                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
369                 validateAndUpdate(newSlots, false);
370                 sendToServer(NULL);
371                 updateLiveTransactionsAndStatus();
372                 return true;
373         } catch (Exception *e) {
374                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
375                 while (kit->hasNext()) {
376                         int64_t m = kit->next();
377                         updateFromLocal(m);
378                 }
379                 delete kit;
380         }
381
382         return false;
383 }
384
385 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
386         while (true) {
387                 if (!arbitratorTable->contains(keyName)) {
388                         // There is already an arbitrator
389                         return false;
390                 }
391                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
392
393                 if (sendToServer(newKey)) {
394                         // If successfully inserted
395                         return true;
396                 }
397         }
398 }
399
400 void Table::startTransaction() {
401         // Create a new transaction, invalidates any old pending transactions.
402         pendingTransactionBuilder = new PendingTransaction(localMachineId);
403 }
404
405 void Table::addKV(IoTString *key, IoTString *value) {
406
407         // Make sure it is a valid key
408         if (!arbitratorTable->contains(key)) {
409                 throw new Error("Key not Found.");
410         }
411
412         // Make sure new key value pair matches the current arbitrator
413         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
414                 // TODO: Maybe not throw en error
415                 throw new Error("Not all Key Values Match Arbitrator.");
416         }
417
418         // Add the key value to this transaction
419         KeyValue *kv = new KeyValue(key, value);
420         pendingTransactionBuilder->addKV(kv);
421 }
422
423 TransactionStatus *Table::commitTransaction() {
424         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
425                 // transaction with no updates will have no effect on the system
426                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
427         }
428
429         // Set the local transaction sequence number and increment
430         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
431         localTransactionSequenceNumber++;
432
433         // Create the transaction status
434         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
435
436         // Create the new transaction
437         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
438         newTransaction->setTransactionStatus(transactionStatus);
439
440         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
441                 // Add it to the queue and invalidate the builder for safety
442                 pendingTransactionQueue->add(newTransaction);
443         } else {
444                 arbitrateOnLocalTransaction(newTransaction);
445                 updateLiveStateFromLocal();
446         }
447
448         pendingTransactionBuilder = new PendingTransaction(localMachineId);
449
450         try {
451                 sendToServer(NULL);
452         } catch (ServerException *e) {
453
454                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
455                 uint size = pendingTransactionQueue->size();
456                 uint oldindex = 0;
457                 for (uint iter = 0; iter < size; iter++) {
458                         Transaction *transaction = pendingTransactionQueue->get(iter);
459                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
460
461                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
462                                 // Already contacted this client so ignore all attempts to contact this client
463                                 // to preserve ordering for arbitrator
464                                 continue;
465                         }
466
467                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
468
469                         if (sendReturn.getFirst()) {
470                                 // Failed to contact over local
471                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
472                         } else {
473                                 // Successful contact or should not contact
474
475                                 if (sendReturn.getSecond()) {
476                                         // did arbitrate
477                                         oldindex--;
478                                 }
479                         }
480                 }
481                 pendingTransactionQueue->setSize(oldindex);
482         }
483
484         updateLiveStateFromLocal();
485
486         return transactionStatus;
487 }
488
489 /**
490  * Recalculate the new resize threshold
491  */
492 void Table::setResizeThreshold() {
493         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
494         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
495 }
496
497 int64_t Table::getLocalSequenceNumber() {
498         return localSequenceNumber;
499 }
500
501 bool Table::sendToServer(NewKey *newKey) {
502         bool fromRetry = false;
503         try {
504                 if (hadPartialSendToServer) {
505                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
506                         if (newSlots->length() == 0) {
507                                 fromRetry = true;
508                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
509
510                                 if (sendSlotsReturn.getFirst()) {
511                                         if (newKey != NULL) {
512                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
513                                                         newKey = NULL;
514                                                 }
515                                         }
516
517                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
518                                         while (trit->hasNext()) {
519                                                 Transaction *transaction = trit->next();
520                                                 transaction->resetServerFailure();
521                                                 // Update which transactions parts still need to be sent
522                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
523                                                 // Add the transaction status to the outstanding list
524                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
525
526                                                 // Update the transaction status
527                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
528
529                                                 // Check if all the transaction parts were successfully
530                                                 // sent and if so then remove it from pending
531                                                 if (transaction->didSendAllParts()) {
532                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
533                                                         pendingTransactionQueue->remove(transaction);
534                                                 }
535                                         }
536                                         delete trit;
537                                 } else {
538                                         newSlots = sendSlotsReturn.getThird();
539                                         bool isInserted = false;
540                                         for (uint si = 0; si < newSlots->length(); si++) {
541                                                 Slot *s = newSlots->get(si);
542                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
543                                                         isInserted = true;
544                                                         break;
545                                                 }
546                                         }
547
548                                         for (uint si = 0; si < newSlots->length(); si++) {
549                                                 Slot *s = newSlots->get(si);
550                                                 if (isInserted) {
551                                                         break;
552                                                 }
553
554                                                 // Process each entry in the slot
555                                                 Vector<Entry *> *ventries = s->getEntries();
556                                                 uint vesize = ventries->size();
557                                                 for (uint vei = 0; vei < vesize; vei++) {
558                                                         Entry *entry = ventries->get(vei);
559                                                         if (entry->getType() == TypeLastMessage) {
560                                                                 LastMessage *lastMessage = (LastMessage *)entry;
561                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
562                                                                         isInserted = true;
563                                                                         break;
564                                                                 }
565                                                         }
566                                                 }
567                                         }
568
569                                         if (isInserted) {
570                                                 if (newKey != NULL) {
571                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
572                                                                 newKey = NULL;
573                                                         }
574                                                 }
575
576                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
577                                                 while (trit->hasNext()) {
578                                                         Transaction *transaction = trit->next();
579                                                         transaction->resetServerFailure();
580
581                                                         // Update which transactions parts still need to be sent
582                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
583
584                                                         // Add the transaction status to the outstanding list
585                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
586
587                                                         // Update the transaction status
588                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
589
590                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
591                                                         if (transaction->didSendAllParts()) {
592                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
593                                                                 pendingTransactionQueue->remove(transaction);
594                                                         } else {
595                                                                 transaction->resetServerFailure();
596                                                                 // Set the transaction sequence number back to nothing
597                                                                 if (!transaction->didSendAPartToServer()) {
598                                                                         transaction->setSequenceNumber(-1);
599                                                                 }
600                                                         }
601                                                 }
602                                                 delete trit;
603                                         }
604                                 }
605
606                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
607                                 while (trit->hasNext()) {
608                                         Transaction *transaction = trit->next();
609                                         transaction->resetServerFailure();
610                                         // Set the transaction sequence number back to nothing
611                                         if (!transaction->didSendAPartToServer()) {
612                                                 transaction->setSequenceNumber(-1);
613                                         }
614                                 }
615                                 delete trit;
616
617                                 if (sendSlotsReturn.getThird()->length() != 0) {
618                                         // insert into the local block chain
619                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
620                                 }
621                                 // continue;
622                         } else {
623                                 bool isInserted = false;
624                                 for (uint si = 0; si < newSlots->length(); si++) {
625                                         Slot *s = newSlots->get(si);
626                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
627                                                 isInserted = true;
628                                                 break;
629                                         }
630                                 }
631
632                                 for (uint si = 0; si < newSlots->length(); si++) {
633                                         Slot *s = newSlots->get(si);
634                                         if (isInserted) {
635                                                 break;
636                                         }
637
638                                         // Process each entry in the slot
639                                         Vector<Entry *> *entries = s->getEntries();
640                                         uint eSize = entries->size();
641                                         for (uint ei = 0; ei < eSize; ei++) {
642                                                 Entry *entry = entries->get(ei);
643
644                                                 if (entry->getType() == TypeLastMessage) {
645                                                         LastMessage *lastMessage = (LastMessage *)entry;
646                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
647                                                                 isInserted = true;
648                                                                 break;
649                                                         }
650                                                 }
651                                         }
652                                 }
653
654                                 if (isInserted) {
655                                         if (newKey != NULL) {
656                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
657                                                         newKey = NULL;
658                                                 }
659                                         }
660
661                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
662                                         while (trit->hasNext()) {
663                                                 Transaction *transaction = trit->next();
664                                                 transaction->resetServerFailure();
665
666                                                 // Update which transactions parts still need to be sent
667                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
668
669                                                 // Add the transaction status to the outstanding list
670                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
671
672                                                 // Update the transaction status
673                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
674
675                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
676                                                 if (transaction->didSendAllParts()) {
677                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
678                                                         pendingTransactionQueue->remove(transaction);
679                                                 } else {
680                                                         transaction->resetServerFailure();
681                                                         // Set the transaction sequence number back to nothing
682                                                         if (!transaction->didSendAPartToServer()) {
683                                                                 transaction->setSequenceNumber(-1);
684                                                         }
685                                                 }
686                                         }
687                                         delete trit;
688                                 } else {
689                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
690                                         while (trit->hasNext()) {
691                                                 Transaction *transaction = trit->next();
692                                                 transaction->resetServerFailure();
693                                                 // Set the transaction sequence number back to nothing
694                                                 if (!transaction->didSendAPartToServer()) {
695                                                         transaction->setSequenceNumber(-1);
696                                                 }
697                                         }
698                                         delete trit;
699                                 }
700
701                                 // insert into the local block chain
702                                 validateAndUpdate(newSlots, true);
703                         }
704                 }
705         } catch (ServerException *e) {
706                 throw e;
707         }
708
709
710
711         try {
712                 // While we have stuff that needs inserting into the block chain
713                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
714
715                         fromRetry = false;
716
717                         if (hadPartialSendToServer) {
718                                 throw new Error("Should Be error free");
719                         }
720
721
722
723                         // If there is a new key with same name then end
724                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
725                                 return false;
726                         }
727
728                         // Create the slot
729                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
730                         localSequenceNumber++;
731
732                         // Try to fill the slot with data
733                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
734                         bool needsResize = fillSlotsReturn.getFirst();
735                         int newSize = fillSlotsReturn.getSecond();
736                         bool insertedNewKey = fillSlotsReturn.getThird();
737
738                         if (needsResize) {
739                                 // Reset which transaction to send
740                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
741                                 while (trit->hasNext()) {
742                                         Transaction *transaction = trit->next();
743                                         transaction->resetNextPartToSend();
744
745                                         // Set the transaction sequence number back to nothing
746                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
747                                                 transaction->setSequenceNumber(-1);
748                                         }
749                                 }
750                                 delete trit;
751
752                                 // Clear the sent data since we are trying again
753                                 pendingSendArbitrationEntriesToDelete->clear();
754                                 transactionPartsSent->clear();
755
756                                 // We needed a resize so try again
757                                 fillSlot(slot, true, newKey);
758                         }
759
760                         lastSlotAttemptedToSend = slot;
761                         lastIsNewKey = (newKey != NULL);
762                         lastInsertedNewKey = insertedNewKey;
763                         lastNewSize = newSize;
764                         lastNewKey = newKey;
765                         lastTransactionPartsSent = transactionPartsSent->clone();
766                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
767
768                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
769
770                         if (sendSlotsReturn.getFirst()) {
771
772                                 // Did insert into the block chain
773
774                                 if (insertedNewKey) {
775                                         // This slot was what was inserted not a previous slot
776
777                                         // New Key was successfully inserted into the block chain so dont want to insert it again
778                                         newKey = NULL;
779                                 }
780
781                                 // Remove the aborts and commit parts that were sent from the pending to send queue
782                                 uint size = pendingSendArbitrationRounds->size();
783                                 uint oldcount = 0;
784                                 for (uint i = 0; i < size; i++) {
785                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
786                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
787
788                                         if (!round->isDoneSending()) {
789                                                 // Sent all the parts
790                                                 pendingSendArbitrationRounds->set(oldcount++,
791                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
792                                         }
793                                 }
794                                 pendingSendArbitrationRounds->setSize(oldcount);
795
796                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
797                                 while (trit->hasNext()) {
798                                         Transaction *transaction = trit->next();
799                                         transaction->resetServerFailure();
800
801                                         // Update which transactions parts still need to be sent
802                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
803
804                                         // Add the transaction status to the outstanding list
805                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
806
807                                         // Update the transaction status
808                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
809
810                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
811                                         if (transaction->didSendAllParts()) {
812                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
813                                                 pendingTransactionQueue->remove(transaction);
814                                         }
815                                 }
816                                 delete trit;
817                         } else {
818                                 // Reset which transaction to send
819                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
820                                 while (trit->hasNext()) {
821                                         Transaction *transaction = trit->next();
822                                         transaction->resetNextPartToSend();
823
824                                         // Set the transaction sequence number back to nothing
825                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
826                                                 transaction->setSequenceNumber(-1);
827                                         }
828                                 }
829                                 delete trit;
830                         }
831
832                         // Clear the sent data in preparation for next send
833                         pendingSendArbitrationEntriesToDelete->clear();
834                         transactionPartsSent->clear();
835
836                         if (sendSlotsReturn.getThird()->length() != 0) {
837                                 // insert into the local block chain
838                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
839                         }
840                 }
841
842         } catch (ServerException *e) {
843                 if (e->getType() != ServerException_TypeInputTimeout) {
844                         // Nothing was able to be sent to the server so just clear these data structures
845                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
846                         while (trit->hasNext()) {
847                                 Transaction *transaction = trit->next();
848                                 transaction->resetNextPartToSend();
849
850                                 // Set the transaction sequence number back to nothing
851                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
852                                         transaction->setSequenceNumber(-1);
853                                 }
854                         }
855                         delete trit;
856                 } else {
857                         // There was a partial send to the server
858                         hadPartialSendToServer = true;
859
860                         // Nothing was able to be sent to the server so just clear these data structures
861                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
862                         while (trit->hasNext()) {
863                                 Transaction *transaction = trit->next();
864                                 transaction->resetNextPartToSend();
865                                 transaction->setServerFailure();
866                         }
867                         delete trit;
868                 }
869
870                 pendingSendArbitrationEntriesToDelete->clear();
871                 transactionPartsSent->clear();
872
873                 throw e;
874         }
875
876         return newKey == NULL;
877 }
878
879 bool Table::updateFromLocal(int64_t machineId) {
880         if (!localCommunicationTable->contains(machineId))
881                 return false;
882
883         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
884
885         // Get the size of the send data
886         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
887
888         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
889         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
890                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
891         }
892
893         Array<char> *sendData = new Array<char>(sendDataSize);
894         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
895
896         // Encode the data
897         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
898         bbEncode->putInt(0);
899
900         // Send by local
901         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
902         localSequenceNumber++;
903
904         if (returnData == NULL) {
905                 // Could not contact server
906                 return false;
907         }
908
909         // Decode the data
910         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
911         int numberOfEntries = bbDecode->getInt();
912
913         for (int i = 0; i < numberOfEntries; i++) {
914                 char type = bbDecode->get();
915                 if (type == TypeAbort) {
916                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
917                         processEntry(abort);
918                 } else if (type == TypeCommitPart) {
919                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
920                         processEntry(commitPart);
921                 }
922         }
923
924         updateLiveStateFromLocal();
925
926         return true;
927 }
928
929 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
930
931         // Get the devices local communications
932         if (!localCommunicationTable->contains(transaction->getArbitrator()))
933                 return Pair<bool, bool>(true, false);
934
935         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
936
937         // Get the size of the send data
938         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
939         {
940                 Vector<TransactionPart *> *tParts = transaction->getParts();
941                 uint tPartsSize = tParts->size();
942                 for (uint i = 0; i < tPartsSize; i++) {
943                         TransactionPart *part = tParts->get(i);
944                         sendDataSize += part->getSize();
945                 }
946         }
947
948         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
949         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
950                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
951         }
952
953         // Make the send data size
954         Array<char> *sendData = new Array<char>(sendDataSize);
955         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
956
957         // Encode the data
958         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
959         bbEncode->putInt(transaction->getParts()->size());
960         {
961                 Vector<TransactionPart *> *tParts = transaction->getParts();
962                 uint tPartsSize = tParts->size();
963                 for (uint i = 0; i < tPartsSize; i++) {
964                         TransactionPart *part = tParts->get(i);
965                         part->encode(bbEncode);
966                 }
967         }
968
969         // Send by local
970         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
971         localSequenceNumber++;
972
973         if (returnData == NULL) {
974                 // Could not contact server
975                 return Pair<bool, bool>(true, false);
976         }
977
978         // Decode the data
979         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
980         bool didCommit = bbDecode->get() == 1;
981         bool couldArbitrate = bbDecode->get() == 1;
982         int numberOfEntries = bbDecode->getInt();
983         bool foundAbort = false;
984
985         for (int i = 0; i < numberOfEntries; i++) {
986                 char type = bbDecode->get();
987                 if (type == TypeAbort) {
988                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
989
990                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
991                                 foundAbort = true;
992                         }
993
994                         processEntry(abort);
995                 } else if (type == TypeCommitPart) {
996                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
997                         processEntry(commitPart);
998                 }
999         }
1000
1001         updateLiveStateFromLocal();
1002
1003         if (couldArbitrate) {
1004                 TransactionStatus *status =  transaction->getTransactionStatus();
1005                 if (didCommit) {
1006                         status->setStatus(TransactionStatus_StatusCommitted);
1007                 } else {
1008                         status->setStatus(TransactionStatus_StatusAborted);
1009                 }
1010         } else {
1011                 TransactionStatus *status =  transaction->getTransactionStatus();
1012                 if (foundAbort) {
1013                         status->setStatus(TransactionStatus_StatusAborted);
1014                 } else {
1015                         status->setStatus(TransactionStatus_StatusCommitted);
1016                 }
1017         }
1018
1019         return Pair<bool, bool>(false, true);
1020 }
1021
1022 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1023         // Decode the data
1024         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1025         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1026         int numberOfParts = bbDecode->getInt();
1027
1028         // If we did commit a transaction or not
1029         bool didCommit = false;
1030         bool couldArbitrate = false;
1031
1032         if (numberOfParts != 0) {
1033
1034                 // decode the transaction
1035                 Transaction *transaction = new Transaction();
1036                 for (int i = 0; i < numberOfParts; i++) {
1037                         bbDecode->get();
1038                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1039                         transaction->addPartDecode(newPart);
1040                 }
1041
1042                 // Arbitrate on transaction and pull relevant return data
1043                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1044                 couldArbitrate = localArbitrateReturn.getFirst();
1045                 didCommit = localArbitrateReturn.getSecond();
1046
1047                 updateLiveStateFromLocal();
1048
1049                 // Transaction was sent to the server so keep track of it to prevent double commit
1050                 if (transaction->getSequenceNumber() != -1) {
1051                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1052                 }
1053         }
1054
1055         // The data to send back
1056         int returnDataSize = 0;
1057         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1058
1059         // Get the aborts to send back
1060         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1061         {
1062                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1063                 while (abortit->hasNext())
1064                         abortLocalSequenceNumbers->add(abortit->next());
1065                 delete abortit;
1066         }
1067
1068         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1069
1070         uint asize = abortLocalSequenceNumbers->size();
1071         for (uint i = 0; i < asize; i++) {
1072                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1073                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1074                         continue;
1075                 }
1076
1077                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1078                 unseenArbitrations->add(abort);
1079                 returnDataSize += abort->getSize();
1080         }
1081
1082         // Get the commits to send back
1083         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1084         if (commitForClientTable != NULL) {
1085                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1086                 {
1087                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1088                         while (commitit->hasNext())
1089                                 commitLocalSequenceNumbers->add(commitit->next());
1090                         delete commitit;
1091                 }
1092                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1093
1094                 uint clsSize = commitLocalSequenceNumbers->size();
1095                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1096                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1097                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1098
1099                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1100                                 continue;
1101                         }
1102
1103                         {
1104                                 Vector<CommitPart *> *parts = commit->getParts();
1105                                 uint nParts = parts->size();
1106                                 for (uint i = 0; i < nParts; i++) {
1107                                         CommitPart *commitPart = parts->get(i);
1108                                         unseenArbitrations->add(commitPart);
1109                                         returnDataSize += commitPart->getSize();
1110                                 }
1111                         }
1112                 }
1113         }
1114
1115         // Number of arbitration entries to decode
1116         returnDataSize += 2 * sizeof(int32_t);
1117
1118         // bool of did commit or not
1119         if (numberOfParts != 0) {
1120                 returnDataSize += sizeof(char);
1121         }
1122
1123         // Data to send Back
1124         Array<char> *returnData = new Array<char>(returnDataSize);
1125         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1126
1127         if (numberOfParts != 0) {
1128                 if (didCommit) {
1129                         bbEncode->put((char)1);
1130                 } else {
1131                         bbEncode->put((char)0);
1132                 }
1133                 if (couldArbitrate) {
1134                         bbEncode->put((char)1);
1135                 } else {
1136                         bbEncode->put((char)0);
1137                 }
1138         }
1139
1140         bbEncode->putInt(unseenArbitrations->size());
1141         uint size = unseenArbitrations->size();
1142         for (uint i = 0; i < size; i++) {
1143                 Entry *entry = unseenArbitrations->get(i);
1144                 entry->encode(bbEncode);
1145         }
1146
1147         localSequenceNumber++;
1148         return returnData;
1149 }
1150
1151 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1152         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1153         attemptedToSendToServer = true;
1154
1155         bool inserted = false;
1156         bool lastTryInserted = false;
1157
1158         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1159         if (array == NULL) {
1160                 array = new Array<Slot *>();
1161                 array->set(0, slot);
1162                 rejectedSlotVector->clear();
1163                 inserted = true;
1164         } else {
1165                 if (array->length() == 0) {
1166                         throw new Error("Server Error: Did not send any slots");
1167                 }
1168
1169                 // if (attemptedToSendToServerTmp) {
1170                 if (hadPartialSendToServer) {
1171
1172                         bool isInserted = false;
1173                         uint size = array->length();
1174                         for (uint i = 0; i < size; i++) {
1175                                 Slot *s = array->get(i);
1176                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1177                                         isInserted = true;
1178                                         break;
1179                                 }
1180                         }
1181
1182                         for (uint i = 0; i < size; i++) {
1183                                 Slot *s = array->get(i);
1184                                 if (isInserted) {
1185                                         break;
1186                                 }
1187
1188                                 // Process each entry in the slot
1189                                 Vector<Entry *> *entries = s->getEntries();
1190                                 uint eSize = entries->size();
1191                                 for (uint ei = 0; ei < eSize; ei++) {
1192                                         Entry *entry = entries->get(ei);
1193
1194                                         if (entry->getType() == TypeLastMessage) {
1195                                                 LastMessage *lastMessage = (LastMessage *)entry;
1196
1197                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1198                                                         isInserted = true;
1199                                                         break;
1200                                                 }
1201                                         }
1202                                 }
1203                         }
1204
1205                         if (!isInserted) {
1206                                 rejectedSlotVector->add(slot->getSequenceNumber());
1207                                 lastTryInserted = false;
1208                         } else {
1209                                 lastTryInserted = true;
1210                         }
1211                 } else {
1212                         rejectedSlotVector->add(slot->getSequenceNumber());
1213                         lastTryInserted = false;
1214                 }
1215         }
1216
1217         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1218 }
1219
1220 /**
1221  * Returns false if a resize was needed
1222  */
1223 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1224         int newSize = 0;
1225         if (liveSlotCount > bufferResizeThreshold) {
1226                 resize = true;//Resize is forced
1227         }
1228
1229         if (resize) {
1230                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1231                 TableStatus *status = new TableStatus(slot, newSize);
1232                 slot->addEntry(status);
1233         }
1234
1235         // Fill with rejected slots first before doing anything else
1236         doRejectedMessages(slot);
1237
1238         // Do mandatory rescue of entries
1239         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1240
1241         // Extract working variables
1242         bool needsResize = mandatoryRescueReturn.getFirst();
1243         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1244         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1245
1246         if (needsResize && !resize) {
1247                 // We need to resize but we are not resizing so return false
1248                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1249         }
1250
1251         bool inserted = false;
1252         if (newKeyEntry != NULL) {
1253                 newKeyEntry->setSlot(slot);
1254                 if (slot->hasSpace(newKeyEntry)) {
1255                         slot->addEntry(newKeyEntry);
1256                         inserted = true;
1257                 }
1258         }
1259
1260         // Clear the transactions, aborts and commits that were sent previously
1261         transactionPartsSent->clear();
1262         pendingSendArbitrationEntriesToDelete->clear();
1263         uint size = pendingSendArbitrationRounds->size();
1264         for (uint i = 0; i < size; i++) {
1265                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1266                 bool isFull = false;
1267                 round->generateParts();
1268                 Vector<Entry *> *parts = round->getParts();
1269
1270                 // Insert pending arbitration data
1271                 uint vsize = parts->size();
1272                 for (uint vi = 0; vi < vsize; vi++) {
1273                         Entry *arbitrationData = parts->get(vi);
1274
1275                         // If it is an abort then we need to set some information
1276                         if (arbitrationData->getType() == TypeAbort) {
1277                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1278                         }
1279
1280                         if (!slot->hasSpace(arbitrationData)) {
1281                                 // No space so cant do anything else with these data entries
1282                                 isFull = true;
1283                                 break;
1284                         }
1285
1286                         // Add to this current slot and add it to entries to delete
1287                         slot->addEntry(arbitrationData);
1288                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1289                 }
1290
1291                 if (isFull) {
1292                         break;
1293                 }
1294         }
1295
1296         if (pendingTransactionQueue->size() > 0) {
1297                 Transaction *transaction = pendingTransactionQueue->get(0);
1298                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1299                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1300                         transaction->setSequenceNumber(slot->getSequenceNumber());
1301                 }
1302
1303                 while (true) {
1304                         TransactionPart *part = transaction->getNextPartToSend();
1305                         if (part == NULL) {
1306                                 // Ran out of parts to send for this transaction so move on
1307                                 break;
1308                         }
1309
1310                         if (slot->hasSpace(part)) {
1311                                 slot->addEntry(part);
1312                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1313                                 if (partsSent == NULL) {
1314                                         partsSent = new Vector<int32_t>();
1315                                         transactionPartsSent->put(transaction, partsSent);
1316                                 }
1317                                 partsSent->add(part->getPartNumber());
1318                                 transactionPartsSent->put(transaction, partsSent);
1319                         } else {
1320                                 break;
1321                         }
1322                 }
1323         }
1324
1325         // Fill the remainder of the slot with rescue data
1326         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1327
1328         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1329 }
1330
1331 void Table::doRejectedMessages(Slot *s) {
1332         if (!rejectedSlotVector->isEmpty()) {
1333                 /* TODO: We should avoid generating a rejected message entry if
1334                  * there is already a sufficient entry in the queue (e->g->,
1335                  * equalsto value of true and same sequence number)->  */
1336
1337                 int64_t old_seqn = rejectedSlotVector->get(0);
1338                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1339                         int64_t new_seqn = rejectedSlotVector->lastElement();
1340                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1341                         s->addEntry(rm);
1342                 } else {
1343                         int64_t prev_seqn = -1;
1344                         uint i = 0;
1345                         /* Go through list of missing messages */
1346                         for (; i < rejectedSlotVector->size(); i++) {
1347                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1348                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1349                                 if (s_msg != NULL)
1350                                         break;
1351                                 prev_seqn = curr_seqn;
1352                         }
1353                         /* Generate rejected message entry for missing messages */
1354                         if (prev_seqn != -1) {
1355                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1356                                 s->addEntry(rm);
1357                         }
1358                         /* Generate rejected message entries for present messages */
1359                         for (; i < rejectedSlotVector->size(); i++) {
1360                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1361                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1362                                 int64_t machineid = s_msg->getMachineID();
1363                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1364                                 s->addEntry(rm);
1365                         }
1366                 }
1367         }
1368 }
1369
1370 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1371         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1372         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1373         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1374                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1375         }
1376
1377         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1378         bool seenLiveSlot = false;
1379         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1380         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1381
1382
1383         // Mandatory Rescue
1384         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1385                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1386                 // Push slot number forward
1387                 if (!seenLiveSlot) {
1388                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1389                 }
1390
1391                 if (!previousSlot->isLive()) {
1392                         continue;
1393                 }
1394
1395                 // We have seen a live slot
1396                 seenLiveSlot = true;
1397
1398                 // Get all the live entries for a slot
1399                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1400
1401                 // Iterate over all the live entries and try to rescue them
1402                 uint lESize = liveEntries->size();
1403                 for (uint i = 0; i < lESize; i++) {
1404                         Entry *liveEntry = liveEntries->get(i);
1405                         if (slot->hasSpace(liveEntry)) {
1406                                 // Enough space to rescue the entry
1407                                 slot->addEntry(liveEntry);
1408                         } else if (currentSequenceNumber == firstIfFull) {
1409                                 //if there's no space but the entry is about to fall off the queue
1410                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1411                         }
1412                 }
1413         }
1414
1415         // Did not resize
1416         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1417 }
1418
1419 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1420         /* now go through live entries from least to greatest sequence number until
1421          * either all live slots added, or the slot doesn't have enough room
1422          * for SKIP_THRESHOLD consecutive entries*/
1423         int skipcount = 0;
1424         int64_t newestseqnum = buffer->getNewestSeqNum();
1425         for (; seqn <= newestseqnum; seqn++) {
1426                 Slot *prevslot = buffer->getSlot(seqn);
1427                 //Push slot number forward
1428                 if (!seenliveslot)
1429                         oldestLiveSlotSequenceNumver = seqn;
1430
1431                 if (!prevslot->isLive())
1432                         continue;
1433                 seenliveslot = true;
1434                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1435                 uint lESize = liveentries->size();
1436                 for (uint i = 0; i < lESize; i++) {
1437                         Entry *liveentry = liveentries->get(i);
1438                         if (s->hasSpace(liveentry))
1439                                 s->addEntry(liveentry);
1440                         else {
1441                                 skipcount++;
1442                                 if (skipcount > Table_SKIP_THRESHOLD)
1443                                         goto donesearch;
1444                         }
1445                 }
1446         }
1447 donesearch:
1448         ;
1449 }
1450
1451 /**
1452  * Checks for malicious activity and updates the local copy of the block chain->
1453  */
1454 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1455         // The cloud communication layer has checked slot HMACs already
1456         // before decoding
1457         if (newSlots->length() == 0) {
1458                 return;
1459         }
1460
1461         // Make sure all slots are newer than the last largest slot this
1462         // client has seen
1463         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1464         if (firstSeqNum <= sequenceNumber) {
1465                 throw new Error("Server Error: Sent older slots!");
1466         }
1467
1468         // Create an object that can access both new slots and slots in our
1469         // local chain without committing slots to our local chain
1470         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1471
1472         // Check that the HMAC chain is not broken
1473         checkHMACChain(indexer, newSlots);
1474
1475         // Set to keep track of messages from clients
1476         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1477         {
1478                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1479                 while (lmit->hasNext())
1480                         machineSet->add(lmit->next());
1481                 delete lmit;
1482         }
1483
1484         // Process each slots data
1485         {
1486                 uint numSlots = newSlots->length();
1487                 for (uint i = 0; i < numSlots; i++) {
1488                         Slot *slot = newSlots->get(i);
1489                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1490                         updateExpectedSize();
1491                 }
1492         }
1493
1494         // If there is a gap, check to see if the server sent us
1495         // everything->
1496         if (firstSeqNum != (sequenceNumber + 1)) {
1497
1498                 // Check the size of the slots that were sent down by the server->
1499                 // Can only check the size if there was a gap
1500                 checkNumSlots(newSlots->length());
1501
1502                 // Since there was a gap every machine must have pushed a slot or
1503                 // must have a last message message-> If not then the server is
1504                 // hiding slots
1505                 if (!machineSet->isEmpty()) {
1506                         throw new Error("Missing record for machines: ");
1507                 }
1508         }
1509
1510         // Update the size of our local block chain->
1511         commitNewMaxSize();
1512
1513         // Commit new to slots to the local block chain->
1514         {
1515                 uint numSlots = newSlots->length();
1516                 for (uint i = 0; i < numSlots; i++) {
1517                         Slot *slot = newSlots->get(i);
1518
1519                         // Insert this slot into our local block chain copy->
1520                         buffer->putSlot(slot);
1521
1522                         // Keep track of how many slots are currently live (have live data
1523                         // in them)->
1524                         liveSlotCount++;
1525                 }
1526         }
1527         // Get the sequence number of the latest slot in the system
1528         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1529         updateLiveStateFromServer();
1530
1531         // No Need to remember after we pulled from the server
1532         offlineTransactionsCommittedAndAtServer->clear();
1533
1534         // This is invalidated now
1535         hadPartialSendToServer = false;
1536 }
1537
1538 void Table::updateLiveStateFromServer() {
1539         // Process the new transaction parts
1540         processNewTransactionParts();
1541
1542         // Do arbitration on new transactions that were received
1543         arbitrateFromServer();
1544
1545         // Update all the committed keys
1546         bool didCommitOrSpeculate = updateCommittedTable();
1547
1548         // Delete the transactions that are now dead
1549         updateLiveTransactionsAndStatus();
1550
1551         // Do speculations
1552         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1553         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1554 }
1555
1556 void Table::updateLiveStateFromLocal() {
1557         // Update all the committed keys
1558         bool didCommitOrSpeculate = updateCommittedTable();
1559
1560         // Delete the transactions that are now dead
1561         updateLiveTransactionsAndStatus();
1562
1563         // Do speculations
1564         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1565         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1566 }
1567
1568 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1569         int64_t prevslots = firstSequenceNumber;
1570
1571         if (didFindTableStatus) {
1572         } else {
1573                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1574         }
1575
1576         didFindTableStatus = true;
1577         currMaxSize = numberOfSlots;
1578 }
1579
1580 void Table::updateExpectedSize() {
1581         expectedsize++;
1582
1583         if (expectedsize > currMaxSize) {
1584                 expectedsize = currMaxSize;
1585         }
1586 }
1587
1588
1589 /**
1590  * Check the size of the block chain to make sure there are enough
1591  * slots sent back by the server-> This is only called when we have a
1592  * gap between the slots that we have locally and the slots sent by
1593  * the server therefore in the slots sent by the server there will be
1594  * at least 1 Table status message
1595  */
1596 void Table::checkNumSlots(int numberOfSlots) {
1597         if (numberOfSlots != expectedsize) {
1598                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1599         }
1600 }
1601
1602 /**
1603  * Update the size of of the local buffer if it is needed->
1604  */
1605 void Table::commitNewMaxSize() {
1606         didFindTableStatus = false;
1607
1608         // Resize the local slot buffer
1609         if (numberOfSlots != currMaxSize) {
1610                 buffer->resize((int32_t)currMaxSize);
1611         }
1612
1613         // Change the number of local slots to the new size
1614         numberOfSlots = (int32_t)currMaxSize;
1615
1616         // Recalculate the resize threshold since the size of the local
1617         // buffer has changed
1618         setResizeThreshold();
1619 }
1620
1621 /**
1622  * Process the new transaction parts from this latest round of slots
1623  * received from the server
1624  */
1625 void Table::processNewTransactionParts() {
1626
1627         if (newTransactionParts->size() == 0) {
1628                 // Nothing new to process
1629                 return;
1630         }
1631
1632         // Iterate through all the machine Ids that we received new parts
1633         // for
1634         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1635         while (tpit->hasNext()) {
1636                 int64_t machineId = tpit->next();
1637                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1638
1639                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1640                 // Iterate through all the parts for that machine Id
1641                 while (ptit->hasNext()) {
1642                         Pair<int64_t, int32_t> *partId = ptit->next();
1643                         TransactionPart *part = parts->get(partId);
1644
1645                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1646                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1647                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1648                                         // Set dead the transaction part
1649                                         part->setDead();
1650                                         continue;
1651                                 }
1652                         }
1653
1654                         // Get the transaction object for that sequence number
1655                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1656
1657                         if (transaction == NULL) {
1658                                 // This is a new transaction that we dont have so make a new one
1659                                 transaction = new Transaction();
1660
1661                                 // Insert this new transaction into the live tables
1662                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1663                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1664                         }
1665
1666                         // Add that part to the transaction
1667                         transaction->addPartDecode(part);
1668                 }
1669                 delete ptit;
1670         }
1671         delete tpit;
1672         // Clear all the new transaction parts in preparation for the next
1673         // time the server sends slots
1674         newTransactionParts->clear();
1675 }
1676
1677 void Table::arbitrateFromServer() {
1678
1679         if (liveTransactionBySequenceNumberTable->size() == 0) {
1680                 // Nothing to arbitrate on so move on
1681                 return;
1682         }
1683
1684         // Get the transaction sequence numbers and sort from oldest to newest
1685         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1686         {
1687                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1688                 while (trit->hasNext())
1689                         transactionSequenceNumbers->add(trit->next());
1690                 delete trit;
1691         }
1692         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1693
1694         // Collection of key value pairs that are
1695         Hashtable<IoTString *, KeyValue *> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1696
1697         // The last transaction arbitrated on
1698         int64_t lastTransactionCommitted = -1;
1699         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1700         uint tsnSize = transactionSequenceNumbers->size();
1701         for (uint i = 0; i < tsnSize; i++) {
1702                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1703                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1704
1705                 // Check if this machine arbitrates for this transaction if not
1706                 // then we cant arbitrate this transaction
1707                 if (transaction->getArbitrator() != localMachineId) {
1708                         continue;
1709                 }
1710
1711                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1712                         continue;
1713                 }
1714
1715                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1716                         // We have seen this already locally so dont commit again
1717                         continue;
1718                 }
1719
1720
1721                 if (!transaction->isComplete()) {
1722                         // Will arbitrate in incorrect order if we continue so just break
1723                         // Most likely this
1724                         break;
1725                 }
1726
1727
1728                 // update the largest transaction seen by arbitrator from server
1729                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1730                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1731                 } else {
1732                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1733                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1734                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1735                         }
1736                 }
1737
1738                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1739                         // Guard evaluated as true
1740
1741                         // Update the local changes so we can make the commit
1742                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1743                         while (kvit->hasNext()) {
1744                                 KeyValue *kv = kvit->next();
1745                                 speculativeTableTmp->put(kv->getKey(), kv);
1746                         }
1747                         delete kvit;
1748
1749                         // Update what the last transaction committed was for use in batch commit
1750                         lastTransactionCommitted = transactionSequenceNumber;
1751                 } else {
1752                         // Guard evaluated was false so create abort
1753                         // Create the abort
1754                         Abort *newAbort = new Abort(NULL,
1755                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1756                                                                                                                                         transaction->getSequenceNumber(),
1757                                                                                                                                         transaction->getMachineId(),
1758                                                                                                                                         transaction->getArbitrator(),
1759                                                                                                                                         localArbitrationSequenceNumber);
1760                         localArbitrationSequenceNumber++;
1761                         generatedAborts->add(newAbort);
1762
1763                         // Insert the abort so we can process
1764                         processEntry(newAbort);
1765                 }
1766
1767                 lastSeqNumArbOn = transactionSequenceNumber;
1768         }
1769
1770         Commit *newCommit = NULL;
1771
1772         // If there is something to commit
1773         if (speculativeTableTmp->size() != 0) {
1774                 // Create the commit and increment the commit sequence number
1775                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1776                 localArbitrationSequenceNumber++;
1777
1778                 // Add all the new keys to the commit
1779                 SetIterator<IoTString *, KeyValue *> *spit = getKeyIterator(speculativeTableTmp);
1780                 while (spit->hasNext()) {
1781                         IoTString *string = spit->next();
1782                         KeyValue *kv = speculativeTableTmp->get(string);
1783                         newCommit->addKV(kv);
1784                 }
1785                 delete spit;
1786
1787                 // create the commit parts
1788                 newCommit->createCommitParts();
1789
1790                 // Append all the commit parts to the end of the pending queue
1791                 // waiting for sending to the server
1792                 // Insert the commit so we can process it
1793                 Vector<CommitPart *> *parts = newCommit->getParts();
1794                 uint partsSize = parts->size();
1795                 for (uint i = 0; i < partsSize; i++) {
1796                         CommitPart *commitPart = parts->get(i);
1797                         processEntry(commitPart);
1798                 }
1799         }
1800
1801         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1802                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1803                 pendingSendArbitrationRounds->add(arbitrationRound);
1804
1805                 if (compactArbitrationData()) {
1806                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1807                         if (newArbitrationRound->getCommit() != NULL) {
1808                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1809                                 uint partsSize = parts->size();
1810                                 for (uint i = 0; i < partsSize; i++) {
1811                                         CommitPart *commitPart = parts->get(i);
1812                                         processEntry(commitPart);
1813                                 }
1814                         }
1815                 }
1816         }
1817 }
1818
1819 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1820
1821         // Check if this machine arbitrates for this transaction if not then
1822         // we cant arbitrate this transaction
1823         if (transaction->getArbitrator() != localMachineId) {
1824                 return Pair<bool, bool>(false, false);
1825         }
1826
1827         if (!transaction->isComplete()) {
1828                 // Will arbitrate in incorrect order if we continue so just break
1829                 // Most likely this
1830                 return Pair<bool, bool>(false, false);
1831         }
1832
1833         if (transaction->getMachineId() != localMachineId) {
1834                 // dont do this check for local transactions
1835                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1836                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1837                                 // We've have already seen this from the server
1838                                 return Pair<bool, bool>(false, false);
1839                         }
1840                 }
1841         }
1842
1843         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1844                 // Guard evaluated as true Create the commit and increment the
1845                 // commit sequence number
1846                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1847                 localArbitrationSequenceNumber++;
1848
1849                 // Update the local changes so we can make the commit
1850                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1851                 while (kvit->hasNext()) {
1852                         KeyValue *kv = kvit->next();
1853                         newCommit->addKV(kv);
1854                 }
1855                 delete kvit;
1856
1857                 // create the commit parts
1858                 newCommit->createCommitParts();
1859
1860                 // Append all the commit parts to the end of the pending queue
1861                 // waiting for sending to the server
1862                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1863                 pendingSendArbitrationRounds->add(arbitrationRound);
1864
1865                 if (compactArbitrationData()) {
1866                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1867                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1868                         uint partsSize = parts->size();
1869                         for (uint i = 0; i < partsSize; i++) {
1870                                 CommitPart *commitPart = parts->get(i);
1871                                 processEntry(commitPart);
1872                         }
1873                 } else {
1874                         // Insert the commit so we can process it
1875                         Vector<CommitPart *> *parts = newCommit->getParts();
1876                         uint partsSize = parts->size();
1877                         for (uint i = 0; i < partsSize; i++) {
1878                                 CommitPart *commitPart = parts->get(i);
1879                                 processEntry(commitPart);
1880                         }
1881                 }
1882
1883                 if (transaction->getMachineId() == localMachineId) {
1884                         TransactionStatus *status = transaction->getTransactionStatus();
1885                         if (status != NULL) {
1886                                 status->setStatus(TransactionStatus_StatusCommitted);
1887                         }
1888                 }
1889
1890                 updateLiveStateFromLocal();
1891                 return Pair<bool, bool>(true, true);
1892         } else {
1893                 if (transaction->getMachineId() == localMachineId) {
1894                         // For locally created messages update the status
1895                         // Guard evaluated was false so create abort
1896                         TransactionStatus *status = transaction->getTransactionStatus();
1897                         if (status != NULL) {
1898                                 status->setStatus(TransactionStatus_StatusAborted);
1899                         }
1900                 } else {
1901                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1902
1903                         // Create the abort
1904                         Abort *newAbort = new Abort(NULL,
1905                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1906                                                                                                                                         -1,
1907                                                                                                                                         transaction->getMachineId(),
1908                                                                                                                                         transaction->getArbitrator(),
1909                                                                                                                                         localArbitrationSequenceNumber);
1910                         localArbitrationSequenceNumber++;
1911                         addAbortSet->add(newAbort);
1912
1913                         // Append all the commit parts to the end of the pending queue
1914                         // waiting for sending to the server
1915                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1916                         pendingSendArbitrationRounds->add(arbitrationRound);
1917
1918                         if (compactArbitrationData()) {
1919                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1920
1921                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1922                                 uint partsSize = parts->size();
1923                                 for (uint i = 0; i < partsSize; i++) {
1924                                         CommitPart *commitPart = parts->get(i);
1925                                         processEntry(commitPart);
1926                                 }
1927                         }
1928                 }
1929
1930                 updateLiveStateFromLocal();
1931                 return Pair<bool, bool>(true, false);
1932         }
1933 }
1934
1935 /**
1936  * Compacts the arbitration data my merging commits and aggregating
1937  * aborts so that a single large push of commits can be done instead
1938  * of many small updates
1939  */
1940 bool Table::compactArbitrationData() {
1941         if (pendingSendArbitrationRounds->size() < 2) {
1942                 // Nothing to compact so do nothing
1943                 return false;
1944         }
1945
1946         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1947         if (lastRound->getDidSendPart()) {
1948                 return false;
1949         }
1950
1951         bool hadCommit = (lastRound->getCommit() == NULL);
1952         bool gotNewCommit = false;
1953
1954         uint numberToDelete = 1;
1955         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1956                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1957
1958                 if (round->isFull() || round->getDidSendPart()) {
1959                         // Stop since there is a part that cannot be compacted and we
1960                         // need to compact in order
1961                         break;
1962                 }
1963
1964                 if (round->getCommit() == NULL) {
1965                         // Try compacting aborts only
1966                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1967                         if (newSize > ArbitrationRound_MAX_PARTS) {
1968                                 // Cant compact since it would be too large
1969                                 break;
1970                         }
1971                         lastRound->addAborts(round->getAborts());
1972                 } else {
1973                         // Create a new larger commit
1974                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1975                         localArbitrationSequenceNumber++;
1976
1977                         // Create the commit parts so that we can count them
1978                         newCommit->createCommitParts();
1979
1980                         // Calculate the new size of the parts
1981                         int newSize = newCommit->getNumberOfParts();
1982                         newSize += lastRound->getAbortsCount();
1983                         newSize += round->getAbortsCount();
1984
1985                         if (newSize > ArbitrationRound_MAX_PARTS) {
1986                                 // Cant compact since it would be too large
1987                                 break;
1988                         }
1989
1990                         // Set the new compacted part
1991                         lastRound->setCommit(newCommit);
1992                         lastRound->addAborts(round->getAborts());
1993                         gotNewCommit = true;
1994                 }
1995
1996                 numberToDelete++;
1997         }
1998
1999         if (numberToDelete != 1) {
2000                 // If there is a compaction
2001                 // Delete the previous pieces that are now in the new compacted piece
2002                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2003                         pendingSendArbitrationRounds->clear();
2004                 } else {
2005                         for (uint i = 0; i < numberToDelete; i++) {
2006                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2007                         }
2008                 }
2009
2010                 // Add the new compacted into the pending to send list
2011                 pendingSendArbitrationRounds->add(lastRound);
2012
2013                 // Should reinsert into the commit processor
2014                 if (hadCommit && gotNewCommit) {
2015                         return true;
2016                 }
2017         }
2018
2019         return false;
2020 }
2021
2022 /**
2023  * Update all the commits and the committed tables, sets dead the dead
2024  * transactions
2025  */
2026 bool Table::updateCommittedTable() {
2027
2028         if (newCommitParts->size() == 0) {
2029                 // Nothing new to process
2030                 return false;
2031         }
2032
2033         // Iterate through all the machine Ids that we received new parts for
2034         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2035         while (partsit->hasNext()) {
2036                 int64_t machineId = partsit->next();
2037                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2038
2039                 // Iterate through all the parts for that machine Id
2040                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2041                 while (pairit->hasNext()) {
2042                         Pair<int64_t, int32_t> *partId = pairit->next();
2043                         CommitPart *part = parts->get(partId);
2044
2045                         // Get the transaction object for that sequence number
2046                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2047
2048                         if (commitForClientTable == NULL) {
2049                                 // This is the first commit from this device
2050                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2051                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2052                         }
2053
2054                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2055
2056                         if (commit == NULL) {
2057                                 // This is a new commit that we dont have so make a new one
2058                                 commit = new Commit();
2059
2060                                 // Insert this new commit into the live tables
2061                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2062                         }
2063
2064                         // Add that part to the commit
2065                         commit->addPartDecode(part);
2066                 }
2067                 delete pairit;
2068         }
2069         delete partsit;
2070
2071         // Clear all the new commits parts in preparation for the next time
2072         // the server sends slots
2073         newCommitParts->clear();
2074
2075         // If we process a new commit keep track of it for future use
2076         bool didProcessANewCommit = false;
2077
2078         // Process the commits one by one
2079         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2080         while (liveit->hasNext()) {
2081                 int64_t arbitratorId = liveit->next();
2082
2083                 // Get all the commits for a specific arbitrator
2084                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2085
2086                 // Sort the commits in order
2087                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2088                 {
2089                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2090                         while (clientit->hasNext())
2091                                 commitSequenceNumbers->add(clientit->next());
2092                         delete clientit;
2093                 }
2094
2095                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2096
2097                 // Get the last commit seen from this arbitrator
2098                 int64_t lastCommitSeenSequenceNumber = -1;
2099                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2100                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2101                 }
2102
2103                 // Go through each new commit one by one
2104                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2105                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2106                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2107
2108                         // Special processing if a commit is not complete
2109                         if (!commit->isComplete()) {
2110                                 if (i == (commitSequenceNumbers->size() - 1)) {
2111                                         // If there is an incomplete commit and this commit is the
2112                                         // latest one seen then this commit cannot be processed and
2113                                         // there are no other commits
2114                                         break;
2115                                 } else {
2116                                         // This is a commit that was already dead but parts of it
2117                                         // are still in the block chain (not flushed out yet)->
2118                                         // Delete it and move on
2119                                         commit->setDead();
2120                                         commitForClientTable->remove(commit->getSequenceNumber());
2121                                         continue;
2122                                 }
2123                         }
2124
2125                         // Update the last transaction that was updated if we can
2126                         if (commit->getTransactionSequenceNumber() != -1) {
2127                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2128                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2129                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2130                                 }
2131                         }
2132
2133                         // Update the last arbitration data that we have seen so far
2134                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2135                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2136                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2137                                         // Is larger
2138                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2139                                 }
2140                         } else {
2141                                 // Never seen any data from this arbitrator so record the first one
2142                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2143                         }
2144
2145                         // We have already seen this commit before so need to do the
2146                         // full processing on this commit
2147                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2148
2149                                 // Update the last transaction that was updated if we can
2150                                 if (commit->getTransactionSequenceNumber() != -1) {
2151                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2152                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2153                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2154                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2155                                         }
2156                                 }
2157
2158                                 continue;
2159                         }
2160
2161                         // If we got here then this is a brand new commit and needs full
2162                         // processing
2163                         // Get what commits should be edited, these are the commits that
2164                         // have live values for their keys
2165                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2166                         {
2167                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2168                                 while (kvit->hasNext()) {
2169                                         KeyValue *kv = kvit->next();
2170                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2171                                         if (commit != NULL)
2172                                                 commitsToEdit->add(commit);
2173                                 }
2174                                 delete kvit;
2175                         }
2176
2177                         // Update each previous commit that needs to be updated
2178                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2179                         while (commitit->hasNext()) {
2180                                 Commit *previousCommit = commitit->next();
2181
2182                                 // Only bother with live commits (TODO: Maybe remove this check)
2183                                 if (previousCommit->isLive()) {
2184
2185                                         // Update which keys in the old commits are still live
2186                                         {
2187                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2188                                                 while (kvit->hasNext()) {
2189                                                         KeyValue *kv = kvit->next();
2190                                                         previousCommit->invalidateKey(kv->getKey());
2191                                                 }
2192                                                 delete kvit;
2193                                         }
2194
2195                                         // if the commit is now dead then remove it
2196                                         if (!previousCommit->isLive()) {
2197                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2198                                         }
2199                                 }
2200                         }
2201                         delete commitit;
2202
2203                         // Update the last seen sequence number from this arbitrator
2204                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2205                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2206                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2207                                 }
2208                         } else {
2209                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2210                         }
2211
2212                         // We processed a new commit that we havent seen before
2213                         didProcessANewCommit = true;
2214
2215                         // Update the committed table of keys and which commit is using which key
2216                         {
2217                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2218                                 while (kvit->hasNext()) {
2219                                         KeyValue *kv = kvit->next();
2220                                         committedKeyValueTable->put(kv->getKey(), kv);
2221                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2222                                 }
2223                                 delete kvit;
2224                         }
2225                 }
2226         }
2227         delete liveit;
2228
2229         return didProcessANewCommit;
2230 }
2231
2232 /**
2233  * Create the speculative table from transactions that are still live
2234  * and have come from the cloud
2235  */
2236 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2237         if (liveTransactionBySequenceNumberTable->size() == 0) {
2238                 // There is nothing to speculate on
2239                 return false;
2240         }
2241
2242         // Create a list of the transaction sequence numbers and sort them
2243         // from oldest to newest
2244         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2245         {
2246                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2247                 while (trit->hasNext())
2248                         transactionSequenceNumbersSorted->add(trit->next());
2249                 delete trit;
2250         }
2251
2252         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2253
2254         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2255
2256
2257         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2258                 // If there is a gap in the transaction sequence numbers then
2259                 // there was a commit or an abort of a transaction OR there was a
2260                 // new commit (Could be from offline commit) so a redo the
2261                 // speculation from scratch
2262
2263                 // Start from scratch
2264                 speculatedKeyValueTable->clear();
2265                 lastTransactionSequenceNumberSpeculatedOn = -1;
2266                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2267         }
2268
2269         // Remember the front of the transaction list
2270         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2271
2272         // Find where to start arbitration from
2273         uint startIndex = 0;
2274
2275         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2276                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2277                         break;
2278         startIndex++;
2279
2280         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2281                 // Make sure we are not out of bounds
2282                 return false;           // did not speculate
2283         }
2284
2285         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2286         bool didSkip = true;
2287
2288         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2289                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2290                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2291
2292                 if (!transaction->isComplete()) {
2293                         // If there is an incomplete transaction then there is nothing
2294                         // we can do add this transactions arbitrator to the list of
2295                         // arbitrators we should ignore
2296                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2297                         didSkip = true;
2298                         continue;
2299                 }
2300
2301                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2302                         continue;
2303                 }
2304
2305                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2306
2307                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2308                         // Guard evaluated to true so update the speculative table
2309                         {
2310                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2311                                 while (kvit->hasNext()) {
2312                                         KeyValue *kv = kvit->next();
2313                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2314                                 }
2315                                 delete kvit;
2316                         }
2317                 }
2318         }
2319
2320         if (didSkip) {
2321                 // Since there was a skip we need to redo the speculation next time around
2322                 lastTransactionSequenceNumberSpeculatedOn = -1;
2323                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2324         }
2325
2326         // We did some speculation
2327         return true;
2328 }
2329
2330 /**
2331  * Create the pending transaction speculative table from transactions
2332  * that are still in the pending transaction buffer
2333  */
2334 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2335         if (pendingTransactionQueue->size() == 0) {
2336                 // There is nothing to speculate on
2337                 return;
2338         }
2339
2340         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2341                 // need to reset on the pending speculation
2342                 lastPendingTransactionSpeculatedOn = NULL;
2343                 firstPendingTransaction = pendingTransactionQueue->get(0);
2344                 pendingTransactionSpeculatedKeyValueTable->clear();
2345         }
2346
2347         // Find where to start arbitration from
2348         uint startIndex = 0;
2349
2350         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2351                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2352                         break;
2353
2354         if (startIndex >= pendingTransactionQueue->size()) {
2355                 // Make sure we are not out of bounds
2356                 return;
2357         }
2358
2359         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2360                 Transaction *transaction = pendingTransactionQueue->get(i);
2361
2362                 lastPendingTransactionSpeculatedOn = transaction;
2363
2364                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2365                         // Guard evaluated to true so update the speculative table
2366                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2367                         while (kvit->hasNext()) {
2368                                 KeyValue *kv = kvit->next();
2369                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2370                         }
2371                         delete kvit;
2372                 }
2373         }
2374 }
2375
2376 /**
2377  * Set dead and remove from the live transaction tables the
2378  * transactions that are dead
2379  */
2380 void Table::updateLiveTransactionsAndStatus() {
2381         // Go through each of the transactions
2382         {
2383                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2384                 while (iter->hasNext()) {
2385                         int64_t key = iter->next();
2386                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2387
2388                         // Check if the transaction is dead
2389                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2390                                 // Set dead the transaction
2391                                 transaction->setDead();
2392
2393                                 // Remove the transaction from the live table
2394                                 iter->remove();
2395                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2396                         }
2397                 }
2398                 delete iter;
2399         }
2400
2401         // Go through each of the transactions
2402         {
2403                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2404                 while (iter->hasNext()) {
2405                         int64_t key = iter->next();
2406                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2407
2408                         // Check if the transaction is dead
2409                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2410
2411                                 // Set committed
2412                                 status->setStatus(TransactionStatus_StatusCommitted);
2413
2414                                 // Remove
2415                                 iter->remove();
2416                         }
2417                 }
2418                 delete iter;
2419         }
2420 }
2421
2422 /**
2423  * Process this slot, entry by entry->  Also update the latest message sent by slot
2424  */
2425 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2426
2427         // Update the last message seen
2428         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2429
2430         // Process each entry in the slot
2431         Vector<Entry *> *entries = slot->getEntries();
2432         uint eSize = entries->size();
2433         for (uint ei = 0; ei < eSize; ei++) {
2434                 Entry *entry = entries->get(ei);
2435                 switch (entry->getType()) {
2436                 case TypeCommitPart:
2437                         processEntry((CommitPart *)entry);
2438                         break;
2439                 case TypeAbort:
2440                         processEntry((Abort *)entry);
2441                         break;
2442                 case TypeTransactionPart:
2443                         processEntry((TransactionPart *)entry);
2444                         break;
2445                 case TypeNewKey:
2446                         processEntry((NewKey *)entry);
2447                         break;
2448                 case TypeLastMessage:
2449                         processEntry((LastMessage *)entry, machineSet);
2450                         break;
2451                 case TypeRejectedMessage:
2452                         processEntry((RejectedMessage *)entry, indexer);
2453                         break;
2454                 case TypeTableStatus:
2455                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2456                         break;
2457                 default:
2458                         throw new Error("Unrecognized type: ");
2459                 }
2460         }
2461 }
2462
2463 /**
2464  * Update the last message that was sent for a machine Id
2465  */
2466 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2467         // Update what the last message received by a machine was
2468         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2469 }
2470
2471 /**
2472  * Add the new key to the arbitrators table and update the set of live
2473  * new keys (in case of a rescued new key message)
2474  */
2475 void Table::processEntry(NewKey *entry) {
2476         // Update the arbitrator table with the new key information
2477         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2478
2479         // Update what the latest live new key is
2480         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2481         if (oldNewKey != NULL) {
2482                 // Delete the old new key messages
2483                 oldNewKey->setDead();
2484         }
2485 }
2486
2487 /**
2488  * Process new table status entries and set dead the old ones as new
2489  * ones come in-> keeps track of the largest and smallest table status
2490  * seen in this current round of updating the local copy of the block
2491  * chain
2492  */
2493 void Table::processEntry(TableStatus *entry, int64_t seq) {
2494         int newNumSlots = entry->getMaxSlots();
2495         updateCurrMaxSize(newNumSlots);
2496         initExpectedSize(seq, newNumSlots);
2497
2498         if (liveTableStatus != NULL) {
2499                 // We have a larger table status so the old table status is no
2500                 // int64_ter alive
2501                 liveTableStatus->setDead();
2502         }
2503
2504         // Make this new table status the latest alive table status
2505         liveTableStatus = entry;
2506 }
2507
2508 /**
2509  * Check old messages to see if there is a block chain violation->
2510  * Also
2511  */
2512 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2513         int64_t oldSeqNum = entry->getOldSeqNum();
2514         int64_t newSeqNum = entry->getNewSeqNum();
2515         bool isequal = entry->getEqual();
2516         int64_t machineId = entry->getMachineID();
2517         int64_t seq = entry->getSequenceNumber();
2518
2519         // Check if we have messages that were supposed to be rejected in
2520         // our local block chain
2521         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2522                 // Get the slot
2523                 Slot *slot = indexer->getSlot(seqNum);
2524
2525                 if (slot != NULL) {
2526                         // If we have this slot make sure that it was not supposed to be
2527                         // a rejected slot
2528                         int64_t slotMachineId = slot->getMachineID();
2529                         if (isequal != (slotMachineId == machineId)) {
2530                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2531                         }
2532                 }
2533         }
2534
2535         // Create a list of clients to watch until they see this rejected
2536         // message entry->
2537         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2538         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2539         while (iter->hasNext()) {
2540                 // Machine ID for the last message entry
2541                 int64_t lastMessageEntryMachineId = iter->next();
2542
2543                 // We've seen it, don't need to continue to watch->  Our next
2544                 // message will implicitly acknowledge it->
2545                 if (lastMessageEntryMachineId == localMachineId) {
2546                         continue;
2547                 }
2548
2549                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2550                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2551
2552                 if (entrySequenceNumber < seq) {
2553                         // Add this rejected message to the set of messages that this
2554                         // machine ID did not see yet
2555                         addWatchVector(lastMessageEntryMachineId, entry);
2556                         // This client did not see this rejected message yet so add it
2557                         // to the watch set to monitor
2558                         deviceWatchSet->add(lastMessageEntryMachineId);
2559                 }
2560         }
2561         delete iter;
2562
2563         if (deviceWatchSet->isEmpty()) {
2564                 // This rejected message has been seen by all the clients so
2565                 entry->setDead();
2566         } else {
2567                 // We need to watch this rejected message
2568                 entry->setWatchSet(deviceWatchSet);
2569         }
2570 }
2571
2572 /**
2573  * Check if this abort is live, if not then save it so we can kill it
2574  * later-> update the last transaction number that was arbitrated on->
2575  */
2576 void Table::processEntry(Abort *entry) {
2577         if (entry->getTransactionSequenceNumber() != -1) {
2578                 // update the transaction status if it was sent to the server
2579                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2580                 if (status != NULL) {
2581                         status->setStatus(TransactionStatus_StatusAborted);
2582                 }
2583         }
2584
2585         // Abort has not been seen by the client it is for yet so we need to
2586         // keep track of it
2587
2588         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2589         if (previouslySeenAbort != NULL) {
2590                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2591         }
2592
2593         if (entry->getTransactionArbitrator() == localMachineId) {
2594                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2595         }
2596
2597         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2598                 // The machine already saw this so it is dead
2599                 entry->setDead();
2600                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2601                 liveAbortTable->remove(&abortid);
2602
2603                 if (entry->getTransactionArbitrator() == localMachineId) {
2604                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2605                 }
2606                 return;
2607         }
2608
2609         // Update the last arbitration data that we have seen so far
2610         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2611                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2612                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2613                         // Is larger
2614                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2615                 }
2616         } else {
2617                 // Never seen any data from this arbitrator so record the first one
2618                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2619         }
2620
2621         // Set dead a transaction if we can
2622         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2623
2624         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2625         if (transactionToSetDead != NULL) {
2626                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2627         }
2628
2629         // Update the last transaction sequence number that the arbitrator
2630         // arbitrated on
2631         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2632                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2633                 // Is a valid one
2634                 if (entry->getTransactionSequenceNumber() != -1) {
2635                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2636                 }
2637         }
2638 }
2639
2640 /**
2641  * Set dead the transaction part if that transaction is dead and keep
2642  * track of all new parts
2643  */
2644 void Table::processEntry(TransactionPart *entry) {
2645         // Check if we have already seen this transaction and set it dead OR
2646         // if it is not alive
2647         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2648                 // This transaction is dead, it was already committed or aborted
2649                 entry->setDead();
2650                 return;
2651         }
2652
2653         // This part is still alive
2654         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2655
2656         if (transactionPart == NULL) {
2657                 // Dont have a table for this machine Id yet so make one
2658                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2659                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2660         }
2661
2662         // Update the part and set dead ones we have already seen (got a
2663         // rescued version)
2664         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2665         if (previouslySeenPart != NULL) {
2666                 previouslySeenPart->setDead();
2667         }
2668 }
2669
2670 /**
2671  * Process new commit entries and save them for future use->  Delete duplicates
2672  */
2673 void Table::processEntry(CommitPart *entry) {
2674         // Update the last transaction that was updated if we can
2675         if (entry->getTransactionSequenceNumber() != -1) {
2676                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2677                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2678                 }
2679         }
2680
2681         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2682         if (commitPart == NULL) {
2683                 // Don't have a table for this machine Id yet so make one
2684                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2685                 newCommitParts->put(entry->getMachineId(), commitPart);
2686         }
2687         // Update the part and set dead ones we have already seen (got a
2688         // rescued version)
2689         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2690         if (previouslySeenPart != NULL) {
2691                 previouslySeenPart->setDead();
2692         }
2693 }
2694
2695 /**
2696  * Update the last message seen table-> Update and set dead the
2697  * appropriate RejectedMessages as clients see them-> Updates the live
2698  * aborts, removes those that are dead and sets them dead-> Check that
2699  * the last message seen is correct and that there is no mismatch of
2700  * our own last message or that other clients have not had a rollback
2701  * on the last message->
2702  */
2703 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2704         // We have seen this machine ID
2705         machineSet->remove(machineId);
2706
2707         // Get the set of rejected messages that this machine Id is has not seen yet
2708         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2709         // If there is a rejected message that this machine Id has not seen yet
2710         if (watchset != NULL) {
2711                 // Go through each rejected message that this machine Id has not
2712                 // seen yet
2713
2714                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2715                 while (rmit->hasNext()) {
2716                         RejectedMessage *rm = rmit->next();
2717                         // If this machine Id has seen this rejected message->->->
2718                         if (rm->getSequenceNumber() <= seqNum) {
2719                                 // Remove it from our watchlist
2720                                 rmit->remove();
2721                                 // Decrement machines that need to see this notification
2722                                 rm->removeWatcher(machineId);
2723                         }
2724                 }
2725                 delete rmit;
2726         }
2727
2728         // Set dead the abort
2729         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2730
2731         while (abortit->hasNext()) {
2732                 Pair<int64_t, int64_t> *key = abortit->next();
2733                 Abort *abort = liveAbortTable->get(key);
2734                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2735                         abort->setDead();
2736                         abortit->remove();
2737                         if (abort->getTransactionArbitrator() == localMachineId) {
2738                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2739                         }
2740                 }
2741         }
2742         delete abortit;
2743         if (machineId == localMachineId) {
2744                 // Our own messages are immediately dead->
2745                 char livenessType = liveness->getType();
2746                 if (livenessType == TypeLastMessage) {
2747                         ((LastMessage *)liveness)->setDead();
2748                 } else if (livenessType == TypeSlot) {
2749                         ((Slot *)liveness)->setDead();
2750                 } else {
2751                         throw new Error("Unrecognized type");
2752                 }
2753         }
2754         // Get the old last message for this device
2755         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2756         if (lastMessageEntry == NULL) {
2757                 // If no last message then there is nothing else to process
2758                 return;
2759         }
2760
2761         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2762         Liveness *lastEntry = lastMessageEntry->getSecond();
2763         delete lastMessageEntry;
2764
2765         // If it is not our machine Id since we already set ours to dead
2766         if (machineId != localMachineId) {
2767                 char lastEntryType = lastEntry->getType();
2768
2769                 if (lastEntryType == TypeLastMessage) {
2770                         ((LastMessage *)lastEntry)->setDead();
2771                 } else if (lastEntryType == TypeSlot) {
2772                         ((Slot *)lastEntry)->setDead();
2773                 } else {
2774                         throw new Error("Unrecognized type");
2775                 }
2776         }
2777         // Make sure the server is not playing any games
2778         if (machineId == localMachineId) {
2779                 if (hadPartialSendToServer) {
2780                         // We were not making any updates and we had a machine mismatch
2781                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2782                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2783                         }
2784                 } else {
2785                         // We were not making any updates and we had a machine mismatch
2786                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2787                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2788                         }
2789                 }
2790         } else {
2791                 if (lastMessageSeqNum > seqNum) {
2792                         throw new Error("Server Error: Rollback on remote machine sequence number");
2793                 }
2794         }
2795 }
2796
2797 /**
2798  * Add a rejected message entry to the watch set to keep track of
2799  * which clients have seen that rejected message entry and which have
2800  * not.
2801  */
2802 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2803         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2804         if (entries == NULL) {
2805                 // There is no set for this machine ID yet so create one
2806                 entries = new Hashset<RejectedMessage *>();
2807                 rejectedMessageWatchVectorTable->put(machineId, entries);
2808         }
2809         entries->add(entry);
2810 }
2811
2812 /**
2813  * Check if the HMAC chain is not violated
2814  */
2815 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2816         for (uint i = 0; i < newSlots->length(); i++) {
2817                 Slot *currSlot = newSlots->get(i);
2818                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2819                 if (prevSlot != NULL &&
2820                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2821                         throw new Error("Server Error: Invalid HMAC Chain");
2822         }
2823 }