152b3778cb81003ece942947f1e20d95227d986a
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastPendingSendArbitrationEntriesToDelete(NULL),
64         lastNewKey(NULL),
65         committedKeyValueTable(NULL),
66         speculatedKeyValueTable(NULL),
67         pendingTransactionSpeculatedKeyValueTable(NULL),
68         liveNewKeyTable(NULL),
69         lastMessageTable(NULL),
70         rejectedMessageWatchVectorTable(NULL),
71         arbitratorTable(NULL),
72         liveAbortTable(NULL),
73         newTransactionParts(NULL),
74         newCommitParts(NULL),
75         lastArbitratedTransactionNumberByArbitratorTable(NULL),
76         liveTransactionBySequenceNumberTable(NULL),
77         liveTransactionByTransactionIdTable(NULL),
78         liveCommitsTable(NULL),
79         liveCommitsByKeyTable(NULL),
80         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81         rejectedSlotVector(NULL),
82         pendingTransactionQueue(NULL),
83         pendingSendArbitrationRounds(NULL),
84         pendingSendArbitrationEntriesToDelete(NULL),
85         transactionPartsSent(NULL),
86         outstandingTransactionStatus(NULL),
87         liveAbortsGeneratedByLocal(NULL),
88         offlineTransactionsCommittedAndAtServer(NULL),
89         localCommunicationTable(NULL),
90         lastTransactionSeenFromMachineFromServer(NULL),
91         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92         lastInsertedNewKey(false),
93         lastSeqNumArbOn(0)
94 {
95         init();
96 }
97
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
99         buffer(NULL),
100         cloud(_cloud),
101         random(NULL),
102         liveTableStatus(NULL),
103         pendingTransactionBuilder(NULL),
104         lastPendingTransactionSpeculatedOn(NULL),
105         firstPendingTransaction(NULL),
106         numberOfSlots(0),
107         bufferResizeThreshold(0),
108         liveSlotCount(0),
109         oldestLiveSlotSequenceNumver(1),
110         localMachineId(_localMachineId),
111         sequenceNumber(0),
112         localSequenceNumber(0),
113         localTransactionSequenceNumber(0),
114         lastTransactionSequenceNumberSpeculatedOn(0),
115         oldestTransactionSequenceNumberSpeculatedOn(0),
116         localArbitrationSequenceNumber(0),
117         hadPartialSendToServer(false),
118         attemptedToSendToServer(false),
119         expectedsize(0),
120         didFindTableStatus(false),
121         currMaxSize(0),
122         lastSlotAttemptedToSend(NULL),
123         lastIsNewKey(false),
124         lastNewSize(0),
125         lastTransactionPartsSent(NULL),
126         lastPendingSendArbitrationEntriesToDelete(NULL),
127         lastNewKey(NULL),
128         committedKeyValueTable(NULL),
129         speculatedKeyValueTable(NULL),
130         pendingTransactionSpeculatedKeyValueTable(NULL),
131         liveNewKeyTable(NULL),
132         lastMessageTable(NULL),
133         rejectedMessageWatchVectorTable(NULL),
134         arbitratorTable(NULL),
135         liveAbortTable(NULL),
136         newTransactionParts(NULL),
137         newCommitParts(NULL),
138         lastArbitratedTransactionNumberByArbitratorTable(NULL),
139         liveTransactionBySequenceNumberTable(NULL),
140         liveTransactionByTransactionIdTable(NULL),
141         liveCommitsTable(NULL),
142         liveCommitsByKeyTable(NULL),
143         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144         rejectedSlotVector(NULL),
145         pendingTransactionQueue(NULL),
146         pendingSendArbitrationRounds(NULL),
147         pendingSendArbitrationEntriesToDelete(NULL),
148         transactionPartsSent(NULL),
149         outstandingTransactionStatus(NULL),
150         liveAbortsGeneratedByLocal(NULL),
151         offlineTransactionsCommittedAndAtServer(NULL),
152         localCommunicationTable(NULL),
153         lastTransactionSeenFromMachineFromServer(NULL),
154         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155         lastInsertedNewKey(false),
156         lastSeqNumArbOn(0)
157 {
158         init();
159 }
160
161 Table::~Table() {
162         delete cloud;
163         delete random;
164         delete buffer;
165         // init data structs
166         delete committedKeyValueTable;
167         delete speculatedKeyValueTable;
168         delete pendingTransactionSpeculatedKeyValueTable;
169         delete liveNewKeyTable;
170         delete lastMessageTable;
171         delete rejectedMessageWatchVectorTable;
172         delete arbitratorTable;
173         delete liveAbortTable;
174         delete newTransactionParts;
175         delete newCommitParts;
176         delete lastArbitratedTransactionNumberByArbitratorTable;
177         delete liveTransactionBySequenceNumberTable;
178         delete liveTransactionByTransactionIdTable;
179         delete liveCommitsTable;
180         delete liveCommitsByKeyTable;
181         delete lastCommitSeenSequenceNumberByArbitratorTable;
182         delete rejectedSlotVector;
183         delete pendingTransactionQueue;
184         delete pendingSendArbitrationEntriesToDelete;
185         delete transactionPartsSent;
186         delete outstandingTransactionStatus;
187         delete liveAbortsGeneratedByLocal;
188         delete offlineTransactionsCommittedAndAtServer;
189         delete localCommunicationTable;
190         delete lastTransactionSeenFromMachineFromServer;
191         delete pendingSendArbitrationRounds;
192         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
193 }
194
195 /**
196  * Init all the stuff needed for for table usage
197  */
198 void Table::init() {
199         // Init helper objects
200         random = new SecureRandom();
201         buffer = new SlotBuffer();
202
203         // init data structs
204         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
205         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
206         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
207         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
208         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
209         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
210         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
211         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
212         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
213         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
214         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
215         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
216         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
217         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
218         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
219         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
220         rejectedSlotVector = new Vector<int64_t>();
221         pendingTransactionQueue = new Vector<Transaction *>();
222         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
223         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
224         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
225         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
226         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
227         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
228         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
229         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
230         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
231
232         // Other init stuff
233         numberOfSlots = buffer->capacity();
234         setResizeThreshold();
235 }
236
237 /**
238  * Initialize the table by inserting a table status as the first entry
239  * into the table status also initialize the crypto stuff.
240  */
241 void Table::initTable() {
242         cloud->initSecurity();
243
244         // Create the first insertion into the block chain which is the table status
245         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
246         localSequenceNumber++;
247         TableStatus *status = new TableStatus(s, numberOfSlots);
248         s->addEntry(status);
249         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
250
251         if (array == NULL) {
252                 array = new Array<Slot *>(1);
253                 array->set(0, s);
254                 // update local block chain
255                 validateAndUpdate(array, true);
256         } else if (array->length() == 1) {
257                 // in case we did push the slot BUT we failed to init it
258                 validateAndUpdate(array, true);
259         } else {
260                 throw new Error("Error on initialization");
261         }
262 }
263
264 /**
265  * Rebuild the table from scratch by pulling the latest block chain
266  * from the server.
267  */
268 void Table::rebuild() {
269         // Just pull the latest slots from the server
270         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
271         validateAndUpdate(newslots, true);
272         sendToServer(NULL);
273         updateLiveTransactionsAndStatus();
274 }
275
276 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
277         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
278 }
279
280 int64_t Table::getArbitrator(IoTString *key) {
281         return arbitratorTable->get(key);
282 }
283
284 void Table::close() {
285         cloud->closeCloud();
286 }
287
288 IoTString *Table::getCommitted(IoTString *key)  {
289         KeyValue *kv = committedKeyValueTable->get(key);
290
291         if (kv != NULL) {
292                 return kv->getValue();
293         } else {
294                 return NULL;
295         }
296 }
297
298 IoTString *Table::getSpeculative(IoTString *key) {
299         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
300
301         if (kv == NULL) {
302                 kv = speculatedKeyValueTable->get(key);
303         }
304
305         if (kv == NULL) {
306                 kv = committedKeyValueTable->get(key);
307         }
308
309         if (kv != NULL) {
310                 return kv->getValue();
311         } else {
312                 return NULL;
313         }
314 }
315
316 IoTString *Table::getCommittedAtomic(IoTString *key) {
317         KeyValue *kv = committedKeyValueTable->get(key);
318
319         if (!arbitratorTable->contains(key)) {
320                 throw new Error("Key not Found.");
321         }
322
323         // Make sure new key value pair matches the current arbitrator
324         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
325                 // TODO: Maybe not throw en error
326                 throw new Error("Not all Key Values Match Arbitrator.");
327         }
328
329         if (kv != NULL) {
330                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
331                 return kv->getValue();
332         } else {
333                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
334                 return NULL;
335         }
336 }
337
338 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
339         if (!arbitratorTable->contains(key)) {
340                 throw new Error("Key not Found.");
341         }
342
343         // Make sure new key value pair matches the current arbitrator
344         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
345                 // TODO: Maybe not throw en error
346                 throw new Error("Not all Key Values Match Arbitrator.");
347         }
348
349         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
350
351         if (kv == NULL) {
352                 kv = speculatedKeyValueTable->get(key);
353         }
354
355         if (kv == NULL) {
356                 kv = committedKeyValueTable->get(key);
357         }
358
359         if (kv != NULL) {
360                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
361                 return kv->getValue();
362         } else {
363                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
364                 return NULL;
365         }
366 }
367
368 bool Table::update()  {
369         try {
370                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
371                 validateAndUpdate(newSlots, false);
372                 sendToServer(NULL);
373                 updateLiveTransactionsAndStatus();
374                 return true;
375         } catch (Exception *e) {
376                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
377                 while (kit->hasNext()) {
378                         int64_t m = kit->next();
379                         updateFromLocal(m);
380                 }
381                 delete kit;
382         }
383
384         return false;
385 }
386
387 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
388         while (true) {
389                 if (arbitratorTable->contains(keyName)) {
390                         // There is already an arbitrator
391                         return false;
392                 }
393                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
394
395                 if (sendToServer(newKey)) {
396                         // If successfully inserted
397                         return true;
398                 }
399         }
400 }
401
402 void Table::startTransaction() {
403         // Create a new transaction, invalidates any old pending transactions.
404         pendingTransactionBuilder = new PendingTransaction(localMachineId);
405 }
406
407 void Table::addKV(IoTString *key, IoTString *value) {
408
409         // Make sure it is a valid key
410         if (!arbitratorTable->contains(key)) {
411                 throw new Error("Key not Found.");
412         }
413
414         // Make sure new key value pair matches the current arbitrator
415         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
416                 // TODO: Maybe not throw en error
417                 throw new Error("Not all Key Values Match Arbitrator.");
418         }
419
420         // Add the key value to this transaction
421         KeyValue *kv = new KeyValue(key, value);
422         pendingTransactionBuilder->addKV(kv);
423 }
424
425 TransactionStatus *Table::commitTransaction() {
426         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
427                 // transaction with no updates will have no effect on the system
428                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
429         }
430
431         // Set the local transaction sequence number and increment
432         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
433         localTransactionSequenceNumber++;
434
435         // Create the transaction status
436         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
437
438         // Create the new transaction
439         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
440         newTransaction->setTransactionStatus(transactionStatus);
441
442         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
443                 // Add it to the queue and invalidate the builder for safety
444                 pendingTransactionQueue->add(newTransaction);
445         } else {
446                 arbitrateOnLocalTransaction(newTransaction);
447                 updateLiveStateFromLocal();
448         }
449
450         pendingTransactionBuilder = new PendingTransaction(localMachineId);
451
452         try {
453                 sendToServer(NULL);
454         } catch (ServerException *e) {
455
456                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
457                 uint size = pendingTransactionQueue->size();
458                 uint oldindex = 0;
459                 for (uint iter = 0; iter < size; iter++) {
460                         Transaction *transaction = pendingTransactionQueue->get(iter);
461                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
462
463                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
464                                 // Already contacted this client so ignore all attempts to contact this client
465                                 // to preserve ordering for arbitrator
466                                 continue;
467                         }
468
469                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
470
471                         if (sendReturn.getFirst()) {
472                                 // Failed to contact over local
473                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
474                         } else {
475                                 // Successful contact or should not contact
476
477                                 if (sendReturn.getSecond()) {
478                                         // did arbitrate
479                                         oldindex--;
480                                 }
481                         }
482                 }
483                 pendingTransactionQueue->setSize(oldindex);
484         }
485
486         updateLiveStateFromLocal();
487
488         return transactionStatus;
489 }
490
491 /**
492  * Recalculate the new resize threshold
493  */
494 void Table::setResizeThreshold() {
495         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
496         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
497 }
498
499 int64_t Table::getLocalSequenceNumber() {
500         return localSequenceNumber;
501 }
502
503 bool Table::sendToServer(NewKey *newKey) {
504         bool fromRetry = false;
505         try {
506                 if (hadPartialSendToServer) {
507                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
508                         if (newSlots->length() == 0) {
509                                 fromRetry = true;
510                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
511
512                                 if (sendSlotsReturn.getFirst()) {
513                                         if (newKey != NULL) {
514                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
515                                                         newKey = NULL;
516                                                 }
517                                         }
518
519                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
520                                         while (trit->hasNext()) {
521                                                 Transaction *transaction = trit->next();
522                                                 transaction->resetServerFailure();
523                                                 // Update which transactions parts still need to be sent
524                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
525                                                 // Add the transaction status to the outstanding list
526                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
527
528                                                 // Update the transaction status
529                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
530
531                                                 // Check if all the transaction parts were successfully
532                                                 // sent and if so then remove it from pending
533                                                 if (transaction->didSendAllParts()) {
534                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
535                                                         pendingTransactionQueue->remove(transaction);
536                                                 }
537                                         }
538                                         delete trit;
539                                 } else {
540                                         newSlots = sendSlotsReturn.getThird();
541                                         bool isInserted = false;
542                                         for (uint si = 0; si < newSlots->length(); si++) {
543                                                 Slot *s = newSlots->get(si);
544                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
545                                                         isInserted = true;
546                                                         break;
547                                                 }
548                                         }
549
550                                         for (uint si = 0; si < newSlots->length(); si++) {
551                                                 Slot *s = newSlots->get(si);
552                                                 if (isInserted) {
553                                                         break;
554                                                 }
555
556                                                 // Process each entry in the slot
557                                                 Vector<Entry *> *ventries = s->getEntries();
558                                                 uint vesize = ventries->size();
559                                                 for (uint vei = 0; vei < vesize; vei++) {
560                                                         Entry *entry = ventries->get(vei);
561                                                         if (entry->getType() == TypeLastMessage) {
562                                                                 LastMessage *lastMessage = (LastMessage *)entry;
563                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
564                                                                         isInserted = true;
565                                                                         break;
566                                                                 }
567                                                         }
568                                                 }
569                                         }
570
571                                         if (isInserted) {
572                                                 if (newKey != NULL) {
573                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
574                                                                 newKey = NULL;
575                                                         }
576                                                 }
577
578                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
579                                                 while (trit->hasNext()) {
580                                                         Transaction *transaction = trit->next();
581                                                         transaction->resetServerFailure();
582
583                                                         // Update which transactions parts still need to be sent
584                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
585
586                                                         // Add the transaction status to the outstanding list
587                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
588
589                                                         // Update the transaction status
590                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
591
592                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
593                                                         if (transaction->didSendAllParts()) {
594                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
595                                                                 pendingTransactionQueue->remove(transaction);
596                                                         } else {
597                                                                 transaction->resetServerFailure();
598                                                                 // Set the transaction sequence number back to nothing
599                                                                 if (!transaction->didSendAPartToServer()) {
600                                                                         transaction->setSequenceNumber(-1);
601                                                                 }
602                                                         }
603                                                 }
604                                                 delete trit;
605                                         }
606                                 }
607
608                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
609                                 while (trit->hasNext()) {
610                                         Transaction *transaction = trit->next();
611                                         transaction->resetServerFailure();
612                                         // Set the transaction sequence number back to nothing
613                                         if (!transaction->didSendAPartToServer()) {
614                                                 transaction->setSequenceNumber(-1);
615                                         }
616                                 }
617                                 delete trit;
618
619                                 if (sendSlotsReturn.getThird()->length() != 0) {
620                                         // insert into the local block chain
621                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
622                                 }
623                                 // continue;
624                         } else {
625                                 bool isInserted = false;
626                                 for (uint si = 0; si < newSlots->length(); si++) {
627                                         Slot *s = newSlots->get(si);
628                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
629                                                 isInserted = true;
630                                                 break;
631                                         }
632                                 }
633
634                                 for (uint si = 0; si < newSlots->length(); si++) {
635                                         Slot *s = newSlots->get(si);
636                                         if (isInserted) {
637                                                 break;
638                                         }
639
640                                         // Process each entry in the slot
641                                         Vector<Entry *> *entries = s->getEntries();
642                                         uint eSize = entries->size();
643                                         for (uint ei = 0; ei < eSize; ei++) {
644                                                 Entry *entry = entries->get(ei);
645
646                                                 if (entry->getType() == TypeLastMessage) {
647                                                         LastMessage *lastMessage = (LastMessage *)entry;
648                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
649                                                                 isInserted = true;
650                                                                 break;
651                                                         }
652                                                 }
653                                         }
654                                 }
655
656                                 if (isInserted) {
657                                         if (newKey != NULL) {
658                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
659                                                         newKey = NULL;
660                                                 }
661                                         }
662
663                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664                                         while (trit->hasNext()) {
665                                                 Transaction *transaction = trit->next();
666                                                 transaction->resetServerFailure();
667
668                                                 // Update which transactions parts still need to be sent
669                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
670
671                                                 // Add the transaction status to the outstanding list
672                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
673
674                                                 // Update the transaction status
675                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
676
677                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
678                                                 if (transaction->didSendAllParts()) {
679                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
680                                                         pendingTransactionQueue->remove(transaction);
681                                                 } else {
682                                                         transaction->resetServerFailure();
683                                                         // Set the transaction sequence number back to nothing
684                                                         if (!transaction->didSendAPartToServer()) {
685                                                                 transaction->setSequenceNumber(-1);
686                                                         }
687                                                 }
688                                         }
689                                         delete trit;
690                                 } else {
691                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
692                                         while (trit->hasNext()) {
693                                                 Transaction *transaction = trit->next();
694                                                 transaction->resetServerFailure();
695                                                 // Set the transaction sequence number back to nothing
696                                                 if (!transaction->didSendAPartToServer()) {
697                                                         transaction->setSequenceNumber(-1);
698                                                 }
699                                         }
700                                         delete trit;
701                                 }
702
703                                 // insert into the local block chain
704                                 validateAndUpdate(newSlots, true);
705                         }
706                 }
707         } catch (ServerException *e) {
708                 throw e;
709         }
710
711
712
713         try {
714                 // While we have stuff that needs inserting into the block chain
715                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
716
717                         fromRetry = false;
718
719                         if (hadPartialSendToServer) {
720                                 throw new Error("Should Be error free");
721                         }
722
723
724
725                         // If there is a new key with same name then end
726                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
727                                 return false;
728                         }
729
730                         // Create the slot
731                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
732                         localSequenceNumber++;
733
734                         // Try to fill the slot with data
735                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
736                         bool needsResize = fillSlotsReturn.getFirst();
737                         int newSize = fillSlotsReturn.getSecond();
738                         bool insertedNewKey = fillSlotsReturn.getThird();
739
740                         if (needsResize) {
741                                 // Reset which transaction to send
742                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
743                                 while (trit->hasNext()) {
744                                         Transaction *transaction = trit->next();
745                                         transaction->resetNextPartToSend();
746
747                                         // Set the transaction sequence number back to nothing
748                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
749                                                 transaction->setSequenceNumber(-1);
750                                         }
751                                 }
752                                 delete trit;
753
754                                 // Clear the sent data since we are trying again
755                                 pendingSendArbitrationEntriesToDelete->clear();
756                                 transactionPartsSent->clear();
757
758                                 // We needed a resize so try again
759                                 fillSlot(slot, true, newKey);
760                         }
761
762                         lastSlotAttemptedToSend = slot;
763                         lastIsNewKey = (newKey != NULL);
764                         lastInsertedNewKey = insertedNewKey;
765                         lastNewSize = newSize;
766                         lastNewKey = newKey;
767                         lastTransactionPartsSent = transactionPartsSent->clone();
768                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
769
770                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
771
772                         if (sendSlotsReturn.getFirst()) {
773
774                                 // Did insert into the block chain
775
776                                 if (insertedNewKey) {
777                                         // This slot was what was inserted not a previous slot
778
779                                         // New Key was successfully inserted into the block chain so dont want to insert it again
780                                         newKey = NULL;
781                                 }
782
783                                 // Remove the aborts and commit parts that were sent from the pending to send queue
784                                 uint size = pendingSendArbitrationRounds->size();
785                                 uint oldcount = 0;
786                                 for (uint i = 0; i < size; i++) {
787                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
788                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
789
790                                         if (!round->isDoneSending()) {
791                                                 // Sent all the parts
792                                                 pendingSendArbitrationRounds->set(oldcount++,
793                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
794                                         }
795                                 }
796                                 pendingSendArbitrationRounds->setSize(oldcount);
797
798                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
799                                 while (trit->hasNext()) {
800                                         Transaction *transaction = trit->next();
801                                         transaction->resetServerFailure();
802
803                                         // Update which transactions parts still need to be sent
804                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
805
806                                         // Add the transaction status to the outstanding list
807                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
808
809                                         // Update the transaction status
810                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
811
812                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
813                                         if (transaction->didSendAllParts()) {
814                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
815                                                 pendingTransactionQueue->remove(transaction);
816                                         }
817                                 }
818                                 delete trit;
819                         } else {
820                                 // Reset which transaction to send
821                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
822                                 while (trit->hasNext()) {
823                                         Transaction *transaction = trit->next();
824                                         transaction->resetNextPartToSend();
825
826                                         // Set the transaction sequence number back to nothing
827                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
828                                                 transaction->setSequenceNumber(-1);
829                                         }
830                                 }
831                                 delete trit;
832                         }
833
834                         // Clear the sent data in preparation for next send
835                         pendingSendArbitrationEntriesToDelete->clear();
836                         transactionPartsSent->clear();
837
838                         if (sendSlotsReturn.getThird()->length() != 0) {
839                                 // insert into the local block chain
840                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
841                         }
842                 }
843
844         } catch (ServerException *e) {
845                 if (e->getType() != ServerException_TypeInputTimeout) {
846                         // Nothing was able to be sent to the server so just clear these data structures
847                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
848                         while (trit->hasNext()) {
849                                 Transaction *transaction = trit->next();
850                                 transaction->resetNextPartToSend();
851
852                                 // Set the transaction sequence number back to nothing
853                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
854                                         transaction->setSequenceNumber(-1);
855                                 }
856                         }
857                         delete trit;
858                 } else {
859                         // There was a partial send to the server
860                         hadPartialSendToServer = true;
861
862                         // Nothing was able to be sent to the server so just clear these data structures
863                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
864                         while (trit->hasNext()) {
865                                 Transaction *transaction = trit->next();
866                                 transaction->resetNextPartToSend();
867                                 transaction->setServerFailure();
868                         }
869                         delete trit;
870                 }
871
872                 pendingSendArbitrationEntriesToDelete->clear();
873                 transactionPartsSent->clear();
874
875                 throw e;
876         }
877
878         return newKey == NULL;
879 }
880
881 bool Table::updateFromLocal(int64_t machineId) {
882         if (!localCommunicationTable->contains(machineId))
883                 return false;
884
885         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
886
887         // Get the size of the send data
888         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
889
890         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
891         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
892                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
893         }
894
895         Array<char> *sendData = new Array<char>(sendDataSize);
896         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
897
898         // Encode the data
899         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
900         bbEncode->putInt(0);
901
902         // Send by local
903         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
904         localSequenceNumber++;
905
906         if (returnData == NULL) {
907                 // Could not contact server
908                 return false;
909         }
910
911         // Decode the data
912         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
913         int numberOfEntries = bbDecode->getInt();
914
915         for (int i = 0; i < numberOfEntries; i++) {
916                 char type = bbDecode->get();
917                 if (type == TypeAbort) {
918                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
919                         processEntry(abort);
920                 } else if (type == TypeCommitPart) {
921                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
922                         processEntry(commitPart);
923                 }
924         }
925
926         updateLiveStateFromLocal();
927
928         return true;
929 }
930
931 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
932
933         // Get the devices local communications
934         if (!localCommunicationTable->contains(transaction->getArbitrator()))
935                 return Pair<bool, bool>(true, false);
936
937         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
938
939         // Get the size of the send data
940         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
941         {
942                 Vector<TransactionPart *> *tParts = transaction->getParts();
943                 uint tPartsSize = tParts->size();
944                 for (uint i = 0; i < tPartsSize; i++) {
945                         TransactionPart *part = tParts->get(i);
946                         sendDataSize += part->getSize();
947                 }
948         }
949
950         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
951         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
952                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
953         }
954
955         // Make the send data size
956         Array<char> *sendData = new Array<char>(sendDataSize);
957         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
958
959         // Encode the data
960         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
961         bbEncode->putInt(transaction->getParts()->size());
962         {
963                 Vector<TransactionPart *> *tParts = transaction->getParts();
964                 uint tPartsSize = tParts->size();
965                 for (uint i = 0; i < tPartsSize; i++) {
966                         TransactionPart *part = tParts->get(i);
967                         part->encode(bbEncode);
968                 }
969         }
970
971         // Send by local
972         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
973         localSequenceNumber++;
974
975         if (returnData == NULL) {
976                 // Could not contact server
977                 return Pair<bool, bool>(true, false);
978         }
979
980         // Decode the data
981         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
982         bool didCommit = bbDecode->get() == 1;
983         bool couldArbitrate = bbDecode->get() == 1;
984         int numberOfEntries = bbDecode->getInt();
985         bool foundAbort = false;
986
987         for (int i = 0; i < numberOfEntries; i++) {
988                 char type = bbDecode->get();
989                 if (type == TypeAbort) {
990                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
991
992                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
993                                 foundAbort = true;
994                         }
995
996                         processEntry(abort);
997                 } else if (type == TypeCommitPart) {
998                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
999                         processEntry(commitPart);
1000                 }
1001         }
1002
1003         updateLiveStateFromLocal();
1004
1005         if (couldArbitrate) {
1006                 TransactionStatus *status =  transaction->getTransactionStatus();
1007                 if (didCommit) {
1008                         status->setStatus(TransactionStatus_StatusCommitted);
1009                 } else {
1010                         status->setStatus(TransactionStatus_StatusAborted);
1011                 }
1012         } else {
1013                 TransactionStatus *status =  transaction->getTransactionStatus();
1014                 if (foundAbort) {
1015                         status->setStatus(TransactionStatus_StatusAborted);
1016                 } else {
1017                         status->setStatus(TransactionStatus_StatusCommitted);
1018                 }
1019         }
1020
1021         return Pair<bool, bool>(false, true);
1022 }
1023
1024 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1025         // Decode the data
1026         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1027         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1028         int numberOfParts = bbDecode->getInt();
1029
1030         // If we did commit a transaction or not
1031         bool didCommit = false;
1032         bool couldArbitrate = false;
1033
1034         if (numberOfParts != 0) {
1035
1036                 // decode the transaction
1037                 Transaction *transaction = new Transaction();
1038                 for (int i = 0; i < numberOfParts; i++) {
1039                         bbDecode->get();
1040                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1041                         transaction->addPartDecode(newPart);
1042                 }
1043
1044                 // Arbitrate on transaction and pull relevant return data
1045                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1046                 couldArbitrate = localArbitrateReturn.getFirst();
1047                 didCommit = localArbitrateReturn.getSecond();
1048
1049                 updateLiveStateFromLocal();
1050
1051                 // Transaction was sent to the server so keep track of it to prevent double commit
1052                 if (transaction->getSequenceNumber() != -1) {
1053                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1054                 }
1055         }
1056
1057         // The data to send back
1058         int returnDataSize = 0;
1059         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1060
1061         // Get the aborts to send back
1062         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1063         {
1064                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1065                 while (abortit->hasNext())
1066                         abortLocalSequenceNumbers->add(abortit->next());
1067                 delete abortit;
1068         }
1069
1070         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1071
1072         uint asize = abortLocalSequenceNumbers->size();
1073         for (uint i = 0; i < asize; i++) {
1074                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1075                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1076                         continue;
1077                 }
1078
1079                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1080                 unseenArbitrations->add(abort);
1081                 returnDataSize += abort->getSize();
1082         }
1083
1084         // Get the commits to send back
1085         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1086         if (commitForClientTable != NULL) {
1087                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1088                 {
1089                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1090                         while (commitit->hasNext())
1091                                 commitLocalSequenceNumbers->add(commitit->next());
1092                         delete commitit;
1093                 }
1094                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1095
1096                 uint clsSize = commitLocalSequenceNumbers->size();
1097                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1098                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1099                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1100
1101                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1102                                 continue;
1103                         }
1104
1105                         {
1106                                 Vector<CommitPart *> *parts = commit->getParts();
1107                                 uint nParts = parts->size();
1108                                 for (uint i = 0; i < nParts; i++) {
1109                                         CommitPart *commitPart = parts->get(i);
1110                                         unseenArbitrations->add(commitPart);
1111                                         returnDataSize += commitPart->getSize();
1112                                 }
1113                         }
1114                 }
1115         }
1116
1117         // Number of arbitration entries to decode
1118         returnDataSize += 2 * sizeof(int32_t);
1119
1120         // bool of did commit or not
1121         if (numberOfParts != 0) {
1122                 returnDataSize += sizeof(char);
1123         }
1124
1125         // Data to send Back
1126         Array<char> *returnData = new Array<char>(returnDataSize);
1127         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1128
1129         if (numberOfParts != 0) {
1130                 if (didCommit) {
1131                         bbEncode->put((char)1);
1132                 } else {
1133                         bbEncode->put((char)0);
1134                 }
1135                 if (couldArbitrate) {
1136                         bbEncode->put((char)1);
1137                 } else {
1138                         bbEncode->put((char)0);
1139                 }
1140         }
1141
1142         bbEncode->putInt(unseenArbitrations->size());
1143         uint size = unseenArbitrations->size();
1144         for (uint i = 0; i < size; i++) {
1145                 Entry *entry = unseenArbitrations->get(i);
1146                 entry->encode(bbEncode);
1147         }
1148
1149         localSequenceNumber++;
1150         return returnData;
1151 }
1152
1153 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1154         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1155         attemptedToSendToServer = true;
1156
1157         bool inserted = false;
1158         bool lastTryInserted = false;
1159
1160         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1161         if (array == NULL) {
1162                 array = new Array<Slot *>(1);
1163                 array->set(0, slot);
1164                 rejectedSlotVector->clear();
1165                 inserted = true;
1166         } else {
1167                 if (array->length() == 0) {
1168                         throw new Error("Server Error: Did not send any slots");
1169                 }
1170
1171                 // if (attemptedToSendToServerTmp) {
1172                 if (hadPartialSendToServer) {
1173
1174                         bool isInserted = false;
1175                         uint size = array->length();
1176                         for (uint i = 0; i < size; i++) {
1177                                 Slot *s = array->get(i);
1178                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1179                                         isInserted = true;
1180                                         break;
1181                                 }
1182                         }
1183
1184                         for (uint i = 0; i < size; i++) {
1185                                 Slot *s = array->get(i);
1186                                 if (isInserted) {
1187                                         break;
1188                                 }
1189
1190                                 // Process each entry in the slot
1191                                 Vector<Entry *> *entries = s->getEntries();
1192                                 uint eSize = entries->size();
1193                                 for (uint ei = 0; ei < eSize; ei++) {
1194                                         Entry *entry = entries->get(ei);
1195
1196                                         if (entry->getType() == TypeLastMessage) {
1197                                                 LastMessage *lastMessage = (LastMessage *)entry;
1198
1199                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1200                                                         isInserted = true;
1201                                                         break;
1202                                                 }
1203                                         }
1204                                 }
1205                         }
1206
1207                         if (!isInserted) {
1208                                 rejectedSlotVector->add(slot->getSequenceNumber());
1209                                 lastTryInserted = false;
1210                         } else {
1211                                 lastTryInserted = true;
1212                         }
1213                 } else {
1214                         rejectedSlotVector->add(slot->getSequenceNumber());
1215                         lastTryInserted = false;
1216                 }
1217         }
1218
1219         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1220 }
1221
1222 /**
1223  * Returns false if a resize was needed
1224  */
1225 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1226         int newSize = 0;
1227         if (liveSlotCount > bufferResizeThreshold) {
1228                 resize = true;//Resize is forced
1229         }
1230
1231         if (resize) {
1232                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1233                 TableStatus *status = new TableStatus(slot, newSize);
1234                 slot->addEntry(status);
1235         }
1236
1237         // Fill with rejected slots first before doing anything else
1238         doRejectedMessages(slot);
1239
1240         // Do mandatory rescue of entries
1241         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1242
1243         // Extract working variables
1244         bool needsResize = mandatoryRescueReturn.getFirst();
1245         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1246         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1247
1248         if (needsResize && !resize) {
1249                 // We need to resize but we are not resizing so return false
1250                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1251         }
1252
1253         bool inserted = false;
1254         if (newKeyEntry != NULL) {
1255                 newKeyEntry->setSlot(slot);
1256                 if (slot->hasSpace(newKeyEntry)) {
1257                         slot->addEntry(newKeyEntry);
1258                         inserted = true;
1259                 }
1260         }
1261
1262         // Clear the transactions, aborts and commits that were sent previously
1263         transactionPartsSent->clear();
1264         pendingSendArbitrationEntriesToDelete->clear();
1265         uint size = pendingSendArbitrationRounds->size();
1266         for (uint i = 0; i < size; i++) {
1267                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1268                 bool isFull = false;
1269                 round->generateParts();
1270                 Vector<Entry *> *parts = round->getParts();
1271
1272                 // Insert pending arbitration data
1273                 uint vsize = parts->size();
1274                 for (uint vi = 0; vi < vsize; vi++) {
1275                         Entry *arbitrationData = parts->get(vi);
1276
1277                         // If it is an abort then we need to set some information
1278                         if (arbitrationData->getType() == TypeAbort) {
1279                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1280                         }
1281
1282                         if (!slot->hasSpace(arbitrationData)) {
1283                                 // No space so cant do anything else with these data entries
1284                                 isFull = true;
1285                                 break;
1286                         }
1287
1288                         // Add to this current slot and add it to entries to delete
1289                         slot->addEntry(arbitrationData);
1290                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1291                 }
1292
1293                 if (isFull) {
1294                         break;
1295                 }
1296         }
1297
1298         if (pendingTransactionQueue->size() > 0) {
1299                 Transaction *transaction = pendingTransactionQueue->get(0);
1300                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1301                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1302                         transaction->setSequenceNumber(slot->getSequenceNumber());
1303                 }
1304
1305                 while (true) {
1306                         TransactionPart *part = transaction->getNextPartToSend();
1307                         if (part == NULL) {
1308                                 // Ran out of parts to send for this transaction so move on
1309                                 break;
1310                         }
1311
1312                         if (slot->hasSpace(part)) {
1313                                 slot->addEntry(part);
1314                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1315                                 if (partsSent == NULL) {
1316                                         partsSent = new Vector<int32_t>();
1317                                         transactionPartsSent->put(transaction, partsSent);
1318                                 }
1319                                 partsSent->add(part->getPartNumber());
1320                                 transactionPartsSent->put(transaction, partsSent);
1321                         } else {
1322                                 break;
1323                         }
1324                 }
1325         }
1326
1327         // Fill the remainder of the slot with rescue data
1328         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1329
1330         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1331 }
1332
1333 void Table::doRejectedMessages(Slot *s) {
1334         if (!rejectedSlotVector->isEmpty()) {
1335                 /* TODO: We should avoid generating a rejected message entry if
1336                  * there is already a sufficient entry in the queue (e->g->,
1337                  * equalsto value of true and same sequence number)->  */
1338
1339                 int64_t old_seqn = rejectedSlotVector->get(0);
1340                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1341                         int64_t new_seqn = rejectedSlotVector->lastElement();
1342                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1343                         s->addEntry(rm);
1344                 } else {
1345                         int64_t prev_seqn = -1;
1346                         uint i = 0;
1347                         /* Go through list of missing messages */
1348                         for (; i < rejectedSlotVector->size(); i++) {
1349                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1350                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1351                                 if (s_msg != NULL)
1352                                         break;
1353                                 prev_seqn = curr_seqn;
1354                         }
1355                         /* Generate rejected message entry for missing messages */
1356                         if (prev_seqn != -1) {
1357                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1358                                 s->addEntry(rm);
1359                         }
1360                         /* Generate rejected message entries for present messages */
1361                         for (; i < rejectedSlotVector->size(); i++) {
1362                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1363                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1364                                 int64_t machineid = s_msg->getMachineID();
1365                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1366                                 s->addEntry(rm);
1367                         }
1368                 }
1369         }
1370 }
1371
1372 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1373         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1374         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1375         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1376                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1377         }
1378
1379         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1380         bool seenLiveSlot = false;
1381         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1382         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1383
1384
1385         // Mandatory Rescue
1386         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1387                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1388                 // Push slot number forward
1389                 if (!seenLiveSlot) {
1390                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1391                 }
1392
1393                 if (!previousSlot->isLive()) {
1394                         continue;
1395                 }
1396
1397                 // We have seen a live slot
1398                 seenLiveSlot = true;
1399
1400                 // Get all the live entries for a slot
1401                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1402
1403                 // Iterate over all the live entries and try to rescue them
1404                 uint lESize = liveEntries->size();
1405                 for (uint i = 0; i < lESize; i++) {
1406                         Entry *liveEntry = liveEntries->get(i);
1407                         if (slot->hasSpace(liveEntry)) {
1408                                 // Enough space to rescue the entry
1409                                 slot->addEntry(liveEntry);
1410                         } else if (currentSequenceNumber == firstIfFull) {
1411                                 //if there's no space but the entry is about to fall off the queue
1412                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1413                         }
1414                 }
1415         }
1416
1417         // Did not resize
1418         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1419 }
1420
1421 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1422         /* now go through live entries from least to greatest sequence number until
1423          * either all live slots added, or the slot doesn't have enough room
1424          * for SKIP_THRESHOLD consecutive entries*/
1425         int skipcount = 0;
1426         int64_t newestseqnum = buffer->getNewestSeqNum();
1427         for (; seqn <= newestseqnum; seqn++) {
1428                 Slot *prevslot = buffer->getSlot(seqn);
1429                 //Push slot number forward
1430                 if (!seenliveslot)
1431                         oldestLiveSlotSequenceNumver = seqn;
1432
1433                 if (!prevslot->isLive())
1434                         continue;
1435                 seenliveslot = true;
1436                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1437                 uint lESize = liveentries->size();
1438                 for (uint i = 0; i < lESize; i++) {
1439                         Entry *liveentry = liveentries->get(i);
1440                         if (s->hasSpace(liveentry))
1441                                 s->addEntry(liveentry);
1442                         else {
1443                                 skipcount++;
1444                                 if (skipcount > Table_SKIP_THRESHOLD)
1445                                         goto donesearch;
1446                         }
1447                 }
1448         }
1449 donesearch:
1450         ;
1451 }
1452
1453 /**
1454  * Checks for malicious activity and updates the local copy of the block chain->
1455  */
1456 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1457         // The cloud communication layer has checked slot HMACs already
1458         // before decoding
1459         if (newSlots->length() == 0) {
1460                 return;
1461         }
1462
1463         // Make sure all slots are newer than the last largest slot this
1464         // client has seen
1465         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1466         if (firstSeqNum <= sequenceNumber) {
1467                 throw new Error("Server Error: Sent older slots!");
1468         }
1469
1470         // Create an object that can access both new slots and slots in our
1471         // local chain without committing slots to our local chain
1472         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1473
1474         // Check that the HMAC chain is not broken
1475         checkHMACChain(indexer, newSlots);
1476
1477         // Set to keep track of messages from clients
1478         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1479         {
1480                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1481                 while (lmit->hasNext())
1482                         machineSet->add(lmit->next());
1483                 delete lmit;
1484         }
1485
1486         // Process each slots data
1487         {
1488                 uint numSlots = newSlots->length();
1489                 for (uint i = 0; i < numSlots; i++) {
1490                         Slot *slot = newSlots->get(i);
1491                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1492                         updateExpectedSize();
1493                 }
1494         }
1495
1496         // If there is a gap, check to see if the server sent us
1497         // everything->
1498         if (firstSeqNum != (sequenceNumber + 1)) {
1499
1500                 // Check the size of the slots that were sent down by the server->
1501                 // Can only check the size if there was a gap
1502                 checkNumSlots(newSlots->length());
1503
1504                 // Since there was a gap every machine must have pushed a slot or
1505                 // must have a last message message-> If not then the server is
1506                 // hiding slots
1507                 if (!machineSet->isEmpty()) {
1508                         throw new Error("Missing record for machines: ");
1509                 }
1510         }
1511
1512         // Update the size of our local block chain->
1513         commitNewMaxSize();
1514
1515         // Commit new to slots to the local block chain->
1516         {
1517                 uint numSlots = newSlots->length();
1518                 for (uint i = 0; i < numSlots; i++) {
1519                         Slot *slot = newSlots->get(i);
1520
1521                         // Insert this slot into our local block chain copy->
1522                         buffer->putSlot(slot);
1523
1524                         // Keep track of how many slots are currently live (have live data
1525                         // in them)->
1526                         liveSlotCount++;
1527                 }
1528         }
1529         // Get the sequence number of the latest slot in the system
1530         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1531         updateLiveStateFromServer();
1532
1533         // No Need to remember after we pulled from the server
1534         offlineTransactionsCommittedAndAtServer->clear();
1535
1536         // This is invalidated now
1537         hadPartialSendToServer = false;
1538 }
1539
1540 void Table::updateLiveStateFromServer() {
1541         // Process the new transaction parts
1542         processNewTransactionParts();
1543
1544         // Do arbitration on new transactions that were received
1545         arbitrateFromServer();
1546
1547         // Update all the committed keys
1548         bool didCommitOrSpeculate = updateCommittedTable();
1549
1550         // Delete the transactions that are now dead
1551         updateLiveTransactionsAndStatus();
1552
1553         // Do speculations
1554         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1555         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1556 }
1557
1558 void Table::updateLiveStateFromLocal() {
1559         // Update all the committed keys
1560         bool didCommitOrSpeculate = updateCommittedTable();
1561
1562         // Delete the transactions that are now dead
1563         updateLiveTransactionsAndStatus();
1564
1565         // Do speculations
1566         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1567         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1568 }
1569
1570 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1571         int64_t prevslots = firstSequenceNumber;
1572
1573         if (didFindTableStatus) {
1574         } else {
1575                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1576         }
1577
1578         didFindTableStatus = true;
1579         currMaxSize = numberOfSlots;
1580 }
1581
1582 void Table::updateExpectedSize() {
1583         expectedsize++;
1584
1585         if (expectedsize > currMaxSize) {
1586                 expectedsize = currMaxSize;
1587         }
1588 }
1589
1590
1591 /**
1592  * Check the size of the block chain to make sure there are enough
1593  * slots sent back by the server-> This is only called when we have a
1594  * gap between the slots that we have locally and the slots sent by
1595  * the server therefore in the slots sent by the server there will be
1596  * at least 1 Table status message
1597  */
1598 void Table::checkNumSlots(int numberOfSlots) {
1599         if (numberOfSlots != expectedsize) {
1600                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1601         }
1602 }
1603
1604 /**
1605  * Update the size of of the local buffer if it is needed->
1606  */
1607 void Table::commitNewMaxSize() {
1608         didFindTableStatus = false;
1609
1610         // Resize the local slot buffer
1611         if (numberOfSlots != currMaxSize) {
1612                 buffer->resize((int32_t)currMaxSize);
1613         }
1614
1615         // Change the number of local slots to the new size
1616         numberOfSlots = (int32_t)currMaxSize;
1617
1618         // Recalculate the resize threshold since the size of the local
1619         // buffer has changed
1620         setResizeThreshold();
1621 }
1622
1623 /**
1624  * Process the new transaction parts from this latest round of slots
1625  * received from the server
1626  */
1627 void Table::processNewTransactionParts() {
1628
1629         if (newTransactionParts->size() == 0) {
1630                 // Nothing new to process
1631                 return;
1632         }
1633
1634         // Iterate through all the machine Ids that we received new parts
1635         // for
1636         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1637         while (tpit->hasNext()) {
1638                 int64_t machineId = tpit->next();
1639                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1640
1641                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1642                 // Iterate through all the parts for that machine Id
1643                 while (ptit->hasNext()) {
1644                         Pair<int64_t, int32_t> *partId = ptit->next();
1645                         TransactionPart *part = parts->get(partId);
1646
1647                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1648                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1649                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1650                                         // Set dead the transaction part
1651                                         part->setDead();
1652                                         continue;
1653                                 }
1654                         }
1655
1656                         // Get the transaction object for that sequence number
1657                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1658
1659                         if (transaction == NULL) {
1660                                 // This is a new transaction that we dont have so make a new one
1661                                 transaction = new Transaction();
1662
1663                                 // Insert this new transaction into the live tables
1664                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1665                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1666                         }
1667
1668                         // Add that part to the transaction
1669                         transaction->addPartDecode(part);
1670                 }
1671                 delete ptit;
1672         }
1673         delete tpit;
1674         // Clear all the new transaction parts in preparation for the next
1675         // time the server sends slots
1676         newTransactionParts->clear();
1677 }
1678
1679 void Table::arbitrateFromServer() {
1680
1681         if (liveTransactionBySequenceNumberTable->size() == 0) {
1682                 // Nothing to arbitrate on so move on
1683                 return;
1684         }
1685
1686         // Get the transaction sequence numbers and sort from oldest to newest
1687         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1688         {
1689                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1690                 while (trit->hasNext())
1691                         transactionSequenceNumbers->add(trit->next());
1692                 delete trit;
1693         }
1694         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1695
1696         // Collection of key value pairs that are
1697         Hashtable<IoTString *, KeyValue *> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1698
1699         // The last transaction arbitrated on
1700         int64_t lastTransactionCommitted = -1;
1701         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1702         uint tsnSize = transactionSequenceNumbers->size();
1703         for (uint i = 0; i < tsnSize; i++) {
1704                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1705                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1706
1707                 // Check if this machine arbitrates for this transaction if not
1708                 // then we cant arbitrate this transaction
1709                 if (transaction->getArbitrator() != localMachineId) {
1710                         continue;
1711                 }
1712
1713                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1714                         continue;
1715                 }
1716
1717                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1718                         // We have seen this already locally so dont commit again
1719                         continue;
1720                 }
1721
1722
1723                 if (!transaction->isComplete()) {
1724                         // Will arbitrate in incorrect order if we continue so just break
1725                         // Most likely this
1726                         break;
1727                 }
1728
1729
1730                 // update the largest transaction seen by arbitrator from server
1731                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1732                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1733                 } else {
1734                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1735                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1736                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1737                         }
1738                 }
1739
1740                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1741                         // Guard evaluated as true
1742
1743                         // Update the local changes so we can make the commit
1744                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1745                         while (kvit->hasNext()) {
1746                                 KeyValue *kv = kvit->next();
1747                                 speculativeTableTmp->put(kv->getKey(), kv);
1748                         }
1749                         delete kvit;
1750
1751                         // Update what the last transaction committed was for use in batch commit
1752                         lastTransactionCommitted = transactionSequenceNumber;
1753                 } else {
1754                         // Guard evaluated was false so create abort
1755                         // Create the abort
1756                         Abort *newAbort = new Abort(NULL,
1757                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1758                                                                                                                                         transaction->getSequenceNumber(),
1759                                                                                                                                         transaction->getMachineId(),
1760                                                                                                                                         transaction->getArbitrator(),
1761                                                                                                                                         localArbitrationSequenceNumber);
1762                         localArbitrationSequenceNumber++;
1763                         generatedAborts->add(newAbort);
1764
1765                         // Insert the abort so we can process
1766                         processEntry(newAbort);
1767                 }
1768
1769                 lastSeqNumArbOn = transactionSequenceNumber;
1770         }
1771
1772         Commit *newCommit = NULL;
1773
1774         // If there is something to commit
1775         if (speculativeTableTmp->size() != 0) {
1776                 // Create the commit and increment the commit sequence number
1777                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1778                 localArbitrationSequenceNumber++;
1779
1780                 // Add all the new keys to the commit
1781                 SetIterator<IoTString *, KeyValue *> *spit = getKeyIterator(speculativeTableTmp);
1782                 while (spit->hasNext()) {
1783                         IoTString *string = spit->next();
1784                         KeyValue *kv = speculativeTableTmp->get(string);
1785                         newCommit->addKV(kv);
1786                 }
1787                 delete spit;
1788
1789                 // create the commit parts
1790                 newCommit->createCommitParts();
1791
1792                 // Append all the commit parts to the end of the pending queue
1793                 // waiting for sending to the server
1794                 // Insert the commit so we can process it
1795                 Vector<CommitPart *> *parts = newCommit->getParts();
1796                 uint partsSize = parts->size();
1797                 for (uint i = 0; i < partsSize; i++) {
1798                         CommitPart *commitPart = parts->get(i);
1799                         processEntry(commitPart);
1800                 }
1801         }
1802
1803         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1804                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1805                 pendingSendArbitrationRounds->add(arbitrationRound);
1806
1807                 if (compactArbitrationData()) {
1808                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1809                         if (newArbitrationRound->getCommit() != NULL) {
1810                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1811                                 uint partsSize = parts->size();
1812                                 for (uint i = 0; i < partsSize; i++) {
1813                                         CommitPart *commitPart = parts->get(i);
1814                                         processEntry(commitPart);
1815                                 }
1816                         }
1817                 }
1818         }
1819 }
1820
1821 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1822
1823         // Check if this machine arbitrates for this transaction if not then
1824         // we cant arbitrate this transaction
1825         if (transaction->getArbitrator() != localMachineId) {
1826                 return Pair<bool, bool>(false, false);
1827         }
1828
1829         if (!transaction->isComplete()) {
1830                 // Will arbitrate in incorrect order if we continue so just break
1831                 // Most likely this
1832                 return Pair<bool, bool>(false, false);
1833         }
1834
1835         if (transaction->getMachineId() != localMachineId) {
1836                 // dont do this check for local transactions
1837                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1838                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1839                                 // We've have already seen this from the server
1840                                 return Pair<bool, bool>(false, false);
1841                         }
1842                 }
1843         }
1844
1845         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1846                 // Guard evaluated as true Create the commit and increment the
1847                 // commit sequence number
1848                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1849                 localArbitrationSequenceNumber++;
1850
1851                 // Update the local changes so we can make the commit
1852                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1853                 while (kvit->hasNext()) {
1854                         KeyValue *kv = kvit->next();
1855                         newCommit->addKV(kv);
1856                 }
1857                 delete kvit;
1858
1859                 // create the commit parts
1860                 newCommit->createCommitParts();
1861
1862                 // Append all the commit parts to the end of the pending queue
1863                 // waiting for sending to the server
1864                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1865                 pendingSendArbitrationRounds->add(arbitrationRound);
1866
1867                 if (compactArbitrationData()) {
1868                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1869                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1870                         uint partsSize = parts->size();
1871                         for (uint i = 0; i < partsSize; i++) {
1872                                 CommitPart *commitPart = parts->get(i);
1873                                 processEntry(commitPart);
1874                         }
1875                 } else {
1876                         // Insert the commit so we can process it
1877                         Vector<CommitPart *> *parts = newCommit->getParts();
1878                         uint partsSize = parts->size();
1879                         for (uint i = 0; i < partsSize; i++) {
1880                                 CommitPart *commitPart = parts->get(i);
1881                                 processEntry(commitPart);
1882                         }
1883                 }
1884
1885                 if (transaction->getMachineId() == localMachineId) {
1886                         TransactionStatus *status = transaction->getTransactionStatus();
1887                         if (status != NULL) {
1888                                 status->setStatus(TransactionStatus_StatusCommitted);
1889                         }
1890                 }
1891
1892                 updateLiveStateFromLocal();
1893                 return Pair<bool, bool>(true, true);
1894         } else {
1895                 if (transaction->getMachineId() == localMachineId) {
1896                         // For locally created messages update the status
1897                         // Guard evaluated was false so create abort
1898                         TransactionStatus *status = transaction->getTransactionStatus();
1899                         if (status != NULL) {
1900                                 status->setStatus(TransactionStatus_StatusAborted);
1901                         }
1902                 } else {
1903                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1904
1905                         // Create the abort
1906                         Abort *newAbort = new Abort(NULL,
1907                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1908                                                                                                                                         -1,
1909                                                                                                                                         transaction->getMachineId(),
1910                                                                                                                                         transaction->getArbitrator(),
1911                                                                                                                                         localArbitrationSequenceNumber);
1912                         localArbitrationSequenceNumber++;
1913                         addAbortSet->add(newAbort);
1914
1915                         // Append all the commit parts to the end of the pending queue
1916                         // waiting for sending to the server
1917                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1918                         pendingSendArbitrationRounds->add(arbitrationRound);
1919
1920                         if (compactArbitrationData()) {
1921                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1922
1923                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1924                                 uint partsSize = parts->size();
1925                                 for (uint i = 0; i < partsSize; i++) {
1926                                         CommitPart *commitPart = parts->get(i);
1927                                         processEntry(commitPart);
1928                                 }
1929                         }
1930                 }
1931
1932                 updateLiveStateFromLocal();
1933                 return Pair<bool, bool>(true, false);
1934         }
1935 }
1936
1937 /**
1938  * Compacts the arbitration data my merging commits and aggregating
1939  * aborts so that a single large push of commits can be done instead
1940  * of many small updates
1941  */
1942 bool Table::compactArbitrationData() {
1943         if (pendingSendArbitrationRounds->size() < 2) {
1944                 // Nothing to compact so do nothing
1945                 return false;
1946         }
1947
1948         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1949         if (lastRound->getDidSendPart()) {
1950                 return false;
1951         }
1952
1953         bool hadCommit = (lastRound->getCommit() == NULL);
1954         bool gotNewCommit = false;
1955
1956         uint numberToDelete = 1;
1957         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1958                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1959
1960                 if (round->isFull() || round->getDidSendPart()) {
1961                         // Stop since there is a part that cannot be compacted and we
1962                         // need to compact in order
1963                         break;
1964                 }
1965
1966                 if (round->getCommit() == NULL) {
1967                         // Try compacting aborts only
1968                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1969                         if (newSize > ArbitrationRound_MAX_PARTS) {
1970                                 // Cant compact since it would be too large
1971                                 break;
1972                         }
1973                         lastRound->addAborts(round->getAborts());
1974                 } else {
1975                         // Create a new larger commit
1976                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1977                         localArbitrationSequenceNumber++;
1978
1979                         // Create the commit parts so that we can count them
1980                         newCommit->createCommitParts();
1981
1982                         // Calculate the new size of the parts
1983                         int newSize = newCommit->getNumberOfParts();
1984                         newSize += lastRound->getAbortsCount();
1985                         newSize += round->getAbortsCount();
1986
1987                         if (newSize > ArbitrationRound_MAX_PARTS) {
1988                                 // Cant compact since it would be too large
1989                                 break;
1990                         }
1991
1992                         // Set the new compacted part
1993                         lastRound->setCommit(newCommit);
1994                         lastRound->addAborts(round->getAborts());
1995                         gotNewCommit = true;
1996                 }
1997
1998                 numberToDelete++;
1999         }
2000
2001         if (numberToDelete != 1) {
2002                 // If there is a compaction
2003                 // Delete the previous pieces that are now in the new compacted piece
2004                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2005                         pendingSendArbitrationRounds->clear();
2006                 } else {
2007                         for (uint i = 0; i < numberToDelete; i++) {
2008                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2009                         }
2010                 }
2011
2012                 // Add the new compacted into the pending to send list
2013                 pendingSendArbitrationRounds->add(lastRound);
2014
2015                 // Should reinsert into the commit processor
2016                 if (hadCommit && gotNewCommit) {
2017                         return true;
2018                 }
2019         }
2020
2021         return false;
2022 }
2023
2024 /**
2025  * Update all the commits and the committed tables, sets dead the dead
2026  * transactions
2027  */
2028 bool Table::updateCommittedTable() {
2029
2030         if (newCommitParts->size() == 0) {
2031                 // Nothing new to process
2032                 return false;
2033         }
2034
2035         // Iterate through all the machine Ids that we received new parts for
2036         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2037         while (partsit->hasNext()) {
2038                 int64_t machineId = partsit->next();
2039                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2040
2041                 // Iterate through all the parts for that machine Id
2042                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2043                 while (pairit->hasNext()) {
2044                         Pair<int64_t, int32_t> *partId = pairit->next();
2045                         CommitPart *part = parts->get(partId);
2046
2047                         // Get the transaction object for that sequence number
2048                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2049
2050                         if (commitForClientTable == NULL) {
2051                                 // This is the first commit from this device
2052                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2053                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2054                         }
2055
2056                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2057
2058                         if (commit == NULL) {
2059                                 // This is a new commit that we dont have so make a new one
2060                                 commit = new Commit();
2061
2062                                 // Insert this new commit into the live tables
2063                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2064                         }
2065
2066                         // Add that part to the commit
2067                         commit->addPartDecode(part);
2068                 }
2069                 delete pairit;
2070         }
2071         delete partsit;
2072
2073         // Clear all the new commits parts in preparation for the next time
2074         // the server sends slots
2075         newCommitParts->clear();
2076
2077         // If we process a new commit keep track of it for future use
2078         bool didProcessANewCommit = false;
2079
2080         // Process the commits one by one
2081         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2082         while (liveit->hasNext()) {
2083                 int64_t arbitratorId = liveit->next();
2084
2085                 // Get all the commits for a specific arbitrator
2086                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2087
2088                 // Sort the commits in order
2089                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2090                 {
2091                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2092                         while (clientit->hasNext())
2093                                 commitSequenceNumbers->add(clientit->next());
2094                         delete clientit;
2095                 }
2096
2097                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2098
2099                 // Get the last commit seen from this arbitrator
2100                 int64_t lastCommitSeenSequenceNumber = -1;
2101                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2102                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2103                 }
2104
2105                 // Go through each new commit one by one
2106                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2107                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2108                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2109
2110                         // Special processing if a commit is not complete
2111                         if (!commit->isComplete()) {
2112                                 if (i == (commitSequenceNumbers->size() - 1)) {
2113                                         // If there is an incomplete commit and this commit is the
2114                                         // latest one seen then this commit cannot be processed and
2115                                         // there are no other commits
2116                                         break;
2117                                 } else {
2118                                         // This is a commit that was already dead but parts of it
2119                                         // are still in the block chain (not flushed out yet)->
2120                                         // Delete it and move on
2121                                         commit->setDead();
2122                                         commitForClientTable->remove(commit->getSequenceNumber());
2123                                         continue;
2124                                 }
2125                         }
2126
2127                         // Update the last transaction that was updated if we can
2128                         if (commit->getTransactionSequenceNumber() != -1) {
2129                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2130                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2131                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2132                                 }
2133                         }
2134
2135                         // Update the last arbitration data that we have seen so far
2136                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2137                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2138                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2139                                         // Is larger
2140                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2141                                 }
2142                         } else {
2143                                 // Never seen any data from this arbitrator so record the first one
2144                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2145                         }
2146
2147                         // We have already seen this commit before so need to do the
2148                         // full processing on this commit
2149                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2150
2151                                 // Update the last transaction that was updated if we can
2152                                 if (commit->getTransactionSequenceNumber() != -1) {
2153                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2154                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2155                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2156                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2157                                         }
2158                                 }
2159
2160                                 continue;
2161                         }
2162
2163                         // If we got here then this is a brand new commit and needs full
2164                         // processing
2165                         // Get what commits should be edited, these are the commits that
2166                         // have live values for their keys
2167                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2168                         {
2169                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2170                                 while (kvit->hasNext()) {
2171                                         KeyValue *kv = kvit->next();
2172                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2173                                         if (commit != NULL)
2174                                                 commitsToEdit->add(commit);
2175                                 }
2176                                 delete kvit;
2177                         }
2178
2179                         // Update each previous commit that needs to be updated
2180                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2181                         while (commitit->hasNext()) {
2182                                 Commit *previousCommit = commitit->next();
2183
2184                                 // Only bother with live commits (TODO: Maybe remove this check)
2185                                 if (previousCommit->isLive()) {
2186
2187                                         // Update which keys in the old commits are still live
2188                                         {
2189                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2190                                                 while (kvit->hasNext()) {
2191                                                         KeyValue *kv = kvit->next();
2192                                                         previousCommit->invalidateKey(kv->getKey());
2193                                                 }
2194                                                 delete kvit;
2195                                         }
2196
2197                                         // if the commit is now dead then remove it
2198                                         if (!previousCommit->isLive()) {
2199                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2200                                         }
2201                                 }
2202                         }
2203                         delete commitit;
2204
2205                         // Update the last seen sequence number from this arbitrator
2206                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2207                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2208                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2209                                 }
2210                         } else {
2211                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2212                         }
2213
2214                         // We processed a new commit that we havent seen before
2215                         didProcessANewCommit = true;
2216
2217                         // Update the committed table of keys and which commit is using which key
2218                         {
2219                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2220                                 while (kvit->hasNext()) {
2221                                         KeyValue *kv = kvit->next();
2222                                         committedKeyValueTable->put(kv->getKey(), kv);
2223                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2224                                 }
2225                                 delete kvit;
2226                         }
2227                 }
2228         }
2229         delete liveit;
2230
2231         return didProcessANewCommit;
2232 }
2233
2234 /**
2235  * Create the speculative table from transactions that are still live
2236  * and have come from the cloud
2237  */
2238 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2239         if (liveTransactionBySequenceNumberTable->size() == 0) {
2240                 // There is nothing to speculate on
2241                 return false;
2242         }
2243
2244         // Create a list of the transaction sequence numbers and sort them
2245         // from oldest to newest
2246         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2247         {
2248                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2249                 while (trit->hasNext())
2250                         transactionSequenceNumbersSorted->add(trit->next());
2251                 delete trit;
2252         }
2253
2254         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2255
2256         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2257
2258
2259         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2260                 // If there is a gap in the transaction sequence numbers then
2261                 // there was a commit or an abort of a transaction OR there was a
2262                 // new commit (Could be from offline commit) so a redo the
2263                 // speculation from scratch
2264
2265                 // Start from scratch
2266                 speculatedKeyValueTable->clear();
2267                 lastTransactionSequenceNumberSpeculatedOn = -1;
2268                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2269         }
2270
2271         // Remember the front of the transaction list
2272         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2273
2274         // Find where to start arbitration from
2275         uint startIndex = 0;
2276
2277         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2278                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2279                         break;
2280         startIndex++;
2281
2282         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2283                 // Make sure we are not out of bounds
2284                 return false;           // did not speculate
2285         }
2286
2287         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2288         bool didSkip = true;
2289
2290         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2291                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2292                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2293
2294                 if (!transaction->isComplete()) {
2295                         // If there is an incomplete transaction then there is nothing
2296                         // we can do add this transactions arbitrator to the list of
2297                         // arbitrators we should ignore
2298                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2299                         didSkip = true;
2300                         continue;
2301                 }
2302
2303                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2304                         continue;
2305                 }
2306
2307                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2308
2309                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2310                         // Guard evaluated to true so update the speculative table
2311                         {
2312                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2313                                 while (kvit->hasNext()) {
2314                                         KeyValue *kv = kvit->next();
2315                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2316                                 }
2317                                 delete kvit;
2318                         }
2319                 }
2320         }
2321
2322         if (didSkip) {
2323                 // Since there was a skip we need to redo the speculation next time around
2324                 lastTransactionSequenceNumberSpeculatedOn = -1;
2325                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2326         }
2327
2328         // We did some speculation
2329         return true;
2330 }
2331
2332 /**
2333  * Create the pending transaction speculative table from transactions
2334  * that are still in the pending transaction buffer
2335  */
2336 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2337         if (pendingTransactionQueue->size() == 0) {
2338                 // There is nothing to speculate on
2339                 return;
2340         }
2341
2342         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2343                 // need to reset on the pending speculation
2344                 lastPendingTransactionSpeculatedOn = NULL;
2345                 firstPendingTransaction = pendingTransactionQueue->get(0);
2346                 pendingTransactionSpeculatedKeyValueTable->clear();
2347         }
2348
2349         // Find where to start arbitration from
2350         uint startIndex = 0;
2351
2352         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2353                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2354                         break;
2355
2356         if (startIndex >= pendingTransactionQueue->size()) {
2357                 // Make sure we are not out of bounds
2358                 return;
2359         }
2360
2361         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2362                 Transaction *transaction = pendingTransactionQueue->get(i);
2363
2364                 lastPendingTransactionSpeculatedOn = transaction;
2365
2366                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2367                         // Guard evaluated to true so update the speculative table
2368                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2369                         while (kvit->hasNext()) {
2370                                 KeyValue *kv = kvit->next();
2371                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2372                         }
2373                         delete kvit;
2374                 }
2375         }
2376 }
2377
2378 /**
2379  * Set dead and remove from the live transaction tables the
2380  * transactions that are dead
2381  */
2382 void Table::updateLiveTransactionsAndStatus() {
2383         // Go through each of the transactions
2384         {
2385                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2386                 while (iter->hasNext()) {
2387                         int64_t key = iter->next();
2388                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2389
2390                         // Check if the transaction is dead
2391                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2392                                 // Set dead the transaction
2393                                 transaction->setDead();
2394
2395                                 // Remove the transaction from the live table
2396                                 iter->remove();
2397                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2398                         }
2399                 }
2400                 delete iter;
2401         }
2402
2403         // Go through each of the transactions
2404         {
2405                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2406                 while (iter->hasNext()) {
2407                         int64_t key = iter->next();
2408                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2409
2410                         // Check if the transaction is dead
2411                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2412
2413                                 // Set committed
2414                                 status->setStatus(TransactionStatus_StatusCommitted);
2415
2416                                 // Remove
2417                                 iter->remove();
2418                         }
2419                 }
2420                 delete iter;
2421         }
2422 }
2423
2424 /**
2425  * Process this slot, entry by entry->  Also update the latest message sent by slot
2426  */
2427 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2428
2429         // Update the last message seen
2430         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2431
2432         // Process each entry in the slot
2433         Vector<Entry *> *entries = slot->getEntries();
2434         uint eSize = entries->size();
2435         for (uint ei = 0; ei < eSize; ei++) {
2436                 Entry *entry = entries->get(ei);
2437                 switch (entry->getType()) {
2438                 case TypeCommitPart:
2439                         processEntry((CommitPart *)entry);
2440                         break;
2441                 case TypeAbort:
2442                         processEntry((Abort *)entry);
2443                         break;
2444                 case TypeTransactionPart:
2445                         processEntry((TransactionPart *)entry);
2446                         break;
2447                 case TypeNewKey:
2448                         processEntry((NewKey *)entry);
2449                         break;
2450                 case TypeLastMessage:
2451                         processEntry((LastMessage *)entry, machineSet);
2452                         break;
2453                 case TypeRejectedMessage:
2454                         processEntry((RejectedMessage *)entry, indexer);
2455                         break;
2456                 case TypeTableStatus:
2457                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2458                         break;
2459                 default:
2460                         throw new Error("Unrecognized type: ");
2461                 }
2462         }
2463 }
2464
2465 /**
2466  * Update the last message that was sent for a machine Id
2467  */
2468 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2469         // Update what the last message received by a machine was
2470         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2471 }
2472
2473 /**
2474  * Add the new key to the arbitrators table and update the set of live
2475  * new keys (in case of a rescued new key message)
2476  */
2477 void Table::processEntry(NewKey *entry) {
2478         // Update the arbitrator table with the new key information
2479         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2480
2481         // Update what the latest live new key is
2482         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2483         if (oldNewKey != NULL) {
2484                 // Delete the old new key messages
2485                 oldNewKey->setDead();
2486         }
2487 }
2488
2489 /**
2490  * Process new table status entries and set dead the old ones as new
2491  * ones come in-> keeps track of the largest and smallest table status
2492  * seen in this current round of updating the local copy of the block
2493  * chain
2494  */
2495 void Table::processEntry(TableStatus *entry, int64_t seq) {
2496         int newNumSlots = entry->getMaxSlots();
2497         updateCurrMaxSize(newNumSlots);
2498         initExpectedSize(seq, newNumSlots);
2499
2500         if (liveTableStatus != NULL) {
2501                 // We have a larger table status so the old table status is no
2502                 // int64_ter alive
2503                 liveTableStatus->setDead();
2504         }
2505
2506         // Make this new table status the latest alive table status
2507         liveTableStatus = entry;
2508 }
2509
2510 /**
2511  * Check old messages to see if there is a block chain violation->
2512  * Also
2513  */
2514 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2515         int64_t oldSeqNum = entry->getOldSeqNum();
2516         int64_t newSeqNum = entry->getNewSeqNum();
2517         bool isequal = entry->getEqual();
2518         int64_t machineId = entry->getMachineID();
2519         int64_t seq = entry->getSequenceNumber();
2520
2521         // Check if we have messages that were supposed to be rejected in
2522         // our local block chain
2523         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2524                 // Get the slot
2525                 Slot *slot = indexer->getSlot(seqNum);
2526
2527                 if (slot != NULL) {
2528                         // If we have this slot make sure that it was not supposed to be
2529                         // a rejected slot
2530                         int64_t slotMachineId = slot->getMachineID();
2531                         if (isequal != (slotMachineId == machineId)) {
2532                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2533                         }
2534                 }
2535         }
2536
2537         // Create a list of clients to watch until they see this rejected
2538         // message entry->
2539         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2540         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2541         while (iter->hasNext()) {
2542                 // Machine ID for the last message entry
2543                 int64_t lastMessageEntryMachineId = iter->next();
2544
2545                 // We've seen it, don't need to continue to watch->  Our next
2546                 // message will implicitly acknowledge it->
2547                 if (lastMessageEntryMachineId == localMachineId) {
2548                         continue;
2549                 }
2550
2551                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2552                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2553
2554                 if (entrySequenceNumber < seq) {
2555                         // Add this rejected message to the set of messages that this
2556                         // machine ID did not see yet
2557                         addWatchVector(lastMessageEntryMachineId, entry);
2558                         // This client did not see this rejected message yet so add it
2559                         // to the watch set to monitor
2560                         deviceWatchSet->add(lastMessageEntryMachineId);
2561                 }
2562         }
2563         delete iter;
2564
2565         if (deviceWatchSet->isEmpty()) {
2566                 // This rejected message has been seen by all the clients so
2567                 entry->setDead();
2568         } else {
2569                 // We need to watch this rejected message
2570                 entry->setWatchSet(deviceWatchSet);
2571         }
2572 }
2573
2574 /**
2575  * Check if this abort is live, if not then save it so we can kill it
2576  * later-> update the last transaction number that was arbitrated on->
2577  */
2578 void Table::processEntry(Abort *entry) {
2579         if (entry->getTransactionSequenceNumber() != -1) {
2580                 // update the transaction status if it was sent to the server
2581                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2582                 if (status != NULL) {
2583                         status->setStatus(TransactionStatus_StatusAborted);
2584                 }
2585         }
2586
2587         // Abort has not been seen by the client it is for yet so we need to
2588         // keep track of it
2589
2590         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2591         if (previouslySeenAbort != NULL) {
2592                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2593         }
2594
2595         if (entry->getTransactionArbitrator() == localMachineId) {
2596                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2597         }
2598
2599         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2600                 // The machine already saw this so it is dead
2601                 entry->setDead();
2602                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2603                 liveAbortTable->remove(&abortid);
2604
2605                 if (entry->getTransactionArbitrator() == localMachineId) {
2606                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2607                 }
2608                 return;
2609         }
2610
2611         // Update the last arbitration data that we have seen so far
2612         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2613                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2614                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2615                         // Is larger
2616                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2617                 }
2618         } else {
2619                 // Never seen any data from this arbitrator so record the first one
2620                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2621         }
2622
2623         // Set dead a transaction if we can
2624         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2625
2626         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2627         if (transactionToSetDead != NULL) {
2628                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2629         }
2630
2631         // Update the last transaction sequence number that the arbitrator
2632         // arbitrated on
2633         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2634                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2635                 // Is a valid one
2636                 if (entry->getTransactionSequenceNumber() != -1) {
2637                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2638                 }
2639         }
2640 }
2641
2642 /**
2643  * Set dead the transaction part if that transaction is dead and keep
2644  * track of all new parts
2645  */
2646 void Table::processEntry(TransactionPart *entry) {
2647         // Check if we have already seen this transaction and set it dead OR
2648         // if it is not alive
2649         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2650                 // This transaction is dead, it was already committed or aborted
2651                 entry->setDead();
2652                 return;
2653         }
2654
2655         // This part is still alive
2656         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2657
2658         if (transactionPart == NULL) {
2659                 // Dont have a table for this machine Id yet so make one
2660                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2661                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2662         }
2663
2664         // Update the part and set dead ones we have already seen (got a
2665         // rescued version)
2666         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2667         if (previouslySeenPart != NULL) {
2668                 previouslySeenPart->setDead();
2669         }
2670 }
2671
2672 /**
2673  * Process new commit entries and save them for future use->  Delete duplicates
2674  */
2675 void Table::processEntry(CommitPart *entry) {
2676         // Update the last transaction that was updated if we can
2677         if (entry->getTransactionSequenceNumber() != -1) {
2678                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2679                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2680                 }
2681         }
2682
2683         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2684         if (commitPart == NULL) {
2685                 // Don't have a table for this machine Id yet so make one
2686                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2687                 newCommitParts->put(entry->getMachineId(), commitPart);
2688         }
2689         // Update the part and set dead ones we have already seen (got a
2690         // rescued version)
2691         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2692         if (previouslySeenPart != NULL) {
2693                 previouslySeenPart->setDead();
2694         }
2695 }
2696
2697 /**
2698  * Update the last message seen table-> Update and set dead the
2699  * appropriate RejectedMessages as clients see them-> Updates the live
2700  * aborts, removes those that are dead and sets them dead-> Check that
2701  * the last message seen is correct and that there is no mismatch of
2702  * our own last message or that other clients have not had a rollback
2703  * on the last message->
2704  */
2705 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2706         // We have seen this machine ID
2707         machineSet->remove(machineId);
2708
2709         // Get the set of rejected messages that this machine Id is has not seen yet
2710         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2711         // If there is a rejected message that this machine Id has not seen yet
2712         if (watchset != NULL) {
2713                 // Go through each rejected message that this machine Id has not
2714                 // seen yet
2715
2716                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2717                 while (rmit->hasNext()) {
2718                         RejectedMessage *rm = rmit->next();
2719                         // If this machine Id has seen this rejected message->->->
2720                         if (rm->getSequenceNumber() <= seqNum) {
2721                                 // Remove it from our watchlist
2722                                 rmit->remove();
2723                                 // Decrement machines that need to see this notification
2724                                 rm->removeWatcher(machineId);
2725                         }
2726                 }
2727                 delete rmit;
2728         }
2729
2730         // Set dead the abort
2731         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2732
2733         while (abortit->hasNext()) {
2734                 Pair<int64_t, int64_t> *key = abortit->next();
2735                 Abort *abort = liveAbortTable->get(key);
2736                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2737                         abort->setDead();
2738                         abortit->remove();
2739                         if (abort->getTransactionArbitrator() == localMachineId) {
2740                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2741                         }
2742                 }
2743         }
2744         delete abortit;
2745         if (machineId == localMachineId) {
2746                 // Our own messages are immediately dead->
2747                 char livenessType = liveness->getType();
2748                 if (livenessType == TypeLastMessage) {
2749                         ((LastMessage *)liveness)->setDead();
2750                 } else if (livenessType == TypeSlot) {
2751                         ((Slot *)liveness)->setDead();
2752                 } else {
2753                         throw new Error("Unrecognized type");
2754                 }
2755         }
2756         // Get the old last message for this device
2757         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2758         if (lastMessageEntry == NULL) {
2759                 // If no last message then there is nothing else to process
2760                 return;
2761         }
2762
2763         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2764         Liveness *lastEntry = lastMessageEntry->getSecond();
2765         delete lastMessageEntry;
2766
2767         // If it is not our machine Id since we already set ours to dead
2768         if (machineId != localMachineId) {
2769                 char lastEntryType = lastEntry->getType();
2770
2771                 if (lastEntryType == TypeLastMessage) {
2772                         ((LastMessage *)lastEntry)->setDead();
2773                 } else if (lastEntryType == TypeSlot) {
2774                         ((Slot *)lastEntry)->setDead();
2775                 } else {
2776                         throw new Error("Unrecognized type");
2777                 }
2778         }
2779         // Make sure the server is not playing any games
2780         if (machineId == localMachineId) {
2781                 if (hadPartialSendToServer) {
2782                         // We were not making any updates and we had a machine mismatch
2783                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2784                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2785                         }
2786                 } else {
2787                         // We were not making any updates and we had a machine mismatch
2788                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2789                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2790                         }
2791                 }
2792         } else {
2793                 if (lastMessageSeqNum > seqNum) {
2794                         throw new Error("Server Error: Rollback on remote machine sequence number");
2795                 }
2796         }
2797 }
2798
2799 /**
2800  * Add a rejected message entry to the watch set to keep track of
2801  * which clients have seen that rejected message entry and which have
2802  * not.
2803  */
2804 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2805         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2806         if (entries == NULL) {
2807                 // There is no set for this machine ID yet so create one
2808                 entries = new Hashset<RejectedMessage *>();
2809                 rejectedMessageWatchVectorTable->put(machineId, entries);
2810         }
2811         entries->add(entry);
2812 }
2813
2814 /**
2815  * Check if the HMAC chain is not violated
2816  */
2817 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2818         for (uint i = 0; i < newSlots->length(); i++) {
2819                 Slot *currSlot = newSlots->get(i);
2820                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2821                 if (prevSlot != NULL &&
2822                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2823                         throw new Error("Server Error: Invalid HMAC Chain");
2824         }
2825 }