edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastPendingSendArbitrationEntriesToDelete(NULL),
64         lastNewKey(NULL),
65         committedKeyValueTable(NULL),
66         speculatedKeyValueTable(NULL),
67         pendingTransactionSpeculatedKeyValueTable(NULL),
68         liveNewKeyTable(NULL),
69         lastMessageTable(NULL),
70         rejectedMessageWatchVectorTable(NULL),
71         arbitratorTable(NULL),
72         liveAbortTable(NULL),
73         newTransactionParts(NULL),
74         newCommitParts(NULL),
75         lastArbitratedTransactionNumberByArbitratorTable(NULL),
76         liveTransactionBySequenceNumberTable(NULL),
77         liveTransactionByTransactionIdTable(NULL),
78         liveCommitsTable(NULL),
79         liveCommitsByKeyTable(NULL),
80         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81         rejectedSlotVector(NULL),
82         pendingTransactionQueue(NULL),
83         pendingSendArbitrationRounds(NULL),
84         pendingSendArbitrationEntriesToDelete(NULL),
85         transactionPartsSent(NULL),
86         outstandingTransactionStatus(NULL),
87         liveAbortsGeneratedByLocal(NULL),
88         offlineTransactionsCommittedAndAtServer(NULL),
89         localCommunicationTable(NULL),
90         lastTransactionSeenFromMachineFromServer(NULL),
91         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92         lastInsertedNewKey(false),
93         lastSeqNumArbOn(0)
94 {
95         init();
96 }
97
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
99         buffer(NULL),
100         cloud(_cloud),
101         random(NULL),
102         liveTableStatus(NULL),
103         pendingTransactionBuilder(NULL),
104         lastPendingTransactionSpeculatedOn(NULL),
105         firstPendingTransaction(NULL),
106         numberOfSlots(0),
107         bufferResizeThreshold(0),
108         liveSlotCount(0),
109         oldestLiveSlotSequenceNumver(1),
110         localMachineId(_localMachineId),
111         sequenceNumber(0),
112         localSequenceNumber(0),
113         localTransactionSequenceNumber(0),
114         lastTransactionSequenceNumberSpeculatedOn(0),
115         oldestTransactionSequenceNumberSpeculatedOn(0),
116         localArbitrationSequenceNumber(0),
117         hadPartialSendToServer(false),
118         attemptedToSendToServer(false),
119         expectedsize(0),
120         didFindTableStatus(false),
121         currMaxSize(0),
122         lastSlotAttemptedToSend(NULL),
123         lastIsNewKey(false),
124         lastNewSize(0),
125         lastTransactionPartsSent(NULL),
126         lastPendingSendArbitrationEntriesToDelete(NULL),
127         lastNewKey(NULL),
128         committedKeyValueTable(NULL),
129         speculatedKeyValueTable(NULL),
130         pendingTransactionSpeculatedKeyValueTable(NULL),
131         liveNewKeyTable(NULL),
132         lastMessageTable(NULL),
133         rejectedMessageWatchVectorTable(NULL),
134         arbitratorTable(NULL),
135         liveAbortTable(NULL),
136         newTransactionParts(NULL),
137         newCommitParts(NULL),
138         lastArbitratedTransactionNumberByArbitratorTable(NULL),
139         liveTransactionBySequenceNumberTable(NULL),
140         liveTransactionByTransactionIdTable(NULL),
141         liveCommitsTable(NULL),
142         liveCommitsByKeyTable(NULL),
143         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144         rejectedSlotVector(NULL),
145         pendingTransactionQueue(NULL),
146         pendingSendArbitrationRounds(NULL),
147         pendingSendArbitrationEntriesToDelete(NULL),
148         transactionPartsSent(NULL),
149         outstandingTransactionStatus(NULL),
150         liveAbortsGeneratedByLocal(NULL),
151         offlineTransactionsCommittedAndAtServer(NULL),
152         localCommunicationTable(NULL),
153         lastTransactionSeenFromMachineFromServer(NULL),
154         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155         lastInsertedNewKey(false),
156         lastSeqNumArbOn(0)
157 {
158         init();
159 }
160
161 Table::~Table() {
162         delete cloud;
163         delete random;
164         delete buffer;
165         // init data structs
166         delete committedKeyValueTable;
167         delete speculatedKeyValueTable;
168         delete pendingTransactionSpeculatedKeyValueTable;
169         delete liveNewKeyTable;
170         {
171                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
172                 while (lmit->hasNext()) {
173                         Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
174                 }
175                 delete lmit;
176                 delete lastMessageTable;
177         }
178         if (pendingTransactionBuilder != NULL)
179                 delete pendingTransactionBuilder;
180         {
181                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
182                 while(rmit->hasNext()) {
183                         int64_t machineid = rmit->next();
184                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
185                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
186                         while (mit->hasNext()) {
187                                 RejectedMessage * rm = mit->next();
188                                 delete rm;
189                         }
190                         delete mit;
191                         delete rmset;
192                 }
193                 delete rmit;
194                 delete rejectedMessageWatchVectorTable;
195         }
196         delete arbitratorTable;
197         delete liveAbortTable;
198         {
199                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
200                 while (partsit->hasNext()) {
201                         int64_t machineId = partsit->next();
202                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
203                         delete parts;
204                 }
205                 delete partsit;
206                 delete newTransactionParts;
207         }
208         {
209                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
210                 while (partsit->hasNext()) {
211                         int64_t machineId = partsit->next();
212                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
213                         delete parts;
214                 }
215                 delete partsit;
216                 delete newCommitParts;
217         }
218         delete lastArbitratedTransactionNumberByArbitratorTable;
219         delete liveTransactionBySequenceNumberTable;
220         delete liveTransactionByTransactionIdTable;
221         {
222                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
223                 while (liveit->hasNext()) {
224                         int64_t arbitratorId = liveit->next();
225                         
226                         // Get all the commits for a specific arbitrator
227                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
228                         {
229                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
230                                 while (clientit->hasNext()) {
231                                         int64_t id = clientit->next();
232                                         delete commitForClientTable->get(id);
233                                 }
234                                 delete clientit;
235                         }
236                         
237                         delete commitForClientTable;
238                 }
239                 delete liveit;
240                 delete liveCommitsTable;
241         }
242         delete liveCommitsByKeyTable;
243         delete lastCommitSeenSequenceNumberByArbitratorTable;
244         delete rejectedSlotVector;
245         {
246                 uint size = pendingTransactionQueue->size();
247                 for (uint iter = 0; iter < size; iter++) {
248                         delete pendingTransactionQueue->get(iter);
249                 }
250                 delete pendingTransactionQueue;
251         }
252         delete pendingSendArbitrationEntriesToDelete;
253         delete transactionPartsSent;
254         delete outstandingTransactionStatus;
255         delete liveAbortsGeneratedByLocal;
256         delete offlineTransactionsCommittedAndAtServer;
257         delete localCommunicationTable;
258         delete lastTransactionSeenFromMachineFromServer;
259         {
260                 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
261                         delete pendingSendArbitrationRounds->get(i);
262                 }
263                 delete pendingSendArbitrationRounds;
264         }
265         if (lastTransactionPartsSent != NULL)
266                 delete lastTransactionPartsSent;
267         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
268 }
269
270 /**
271  * Init all the stuff needed for for table usage
272  */
273 void Table::init() {
274         // Init helper objects
275         random = new SecureRandom();
276         buffer = new SlotBuffer();
277
278         // init data structs
279         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
280         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
281         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
282         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
283         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
284         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
285         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
286         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
287         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
288         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
289         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
290         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
291         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
292         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
293         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
294         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
295         rejectedSlotVector = new Vector<int64_t>();
296         pendingTransactionQueue = new Vector<Transaction *>();
297         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
298         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
299         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
300         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
301         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
302         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
303         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
304         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
305         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
306
307         // Other init stuff
308         numberOfSlots = buffer->capacity();
309         setResizeThreshold();
310 }
311
312 /**
313  * Initialize the table by inserting a table status as the first entry
314  * into the table status also initialize the crypto stuff.
315  */
316 void Table::initTable() {
317         cloud->initSecurity();
318
319         // Create the first insertion into the block chain which is the table status
320         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
321         localSequenceNumber++;
322         TableStatus *status = new TableStatus(s, numberOfSlots);
323         s->addEntry(status);
324         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
325
326         if (array == NULL) {
327                 array = new Array<Slot *>(1);
328                 array->set(0, s);
329                 // update local block chain
330                 validateAndUpdate(array, true);
331                 delete array;
332         } else if (array->length() == 1) {
333                 // in case we did push the slot BUT we failed to init it
334                 validateAndUpdate(array, true);
335                 delete array;
336         } else {
337                 delete array;
338                 throw new Error("Error on initialization");
339         }
340 }
341
342 /**
343  * Rebuild the table from scratch by pulling the latest block chain
344  * from the server.
345  */
346 void Table::rebuild() {
347         // Just pull the latest slots from the server
348         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
349         validateAndUpdate(newslots, true);
350         delete newslots;
351         sendToServer(NULL);
352         updateLiveTransactionsAndStatus();
353 }
354
355 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
356         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
357 }
358
359 int64_t Table::getArbitrator(IoTString *key) {
360         return arbitratorTable->get(key);
361 }
362
363 void Table::close() {
364         cloud->closeCloud();
365 }
366
367 IoTString *Table::getCommitted(IoTString *key)  {
368         KeyValue *kv = committedKeyValueTable->get(key);
369
370         if (kv != NULL) {
371                 return new IoTString(kv->getValue());
372         } else {
373                 return NULL;
374         }
375 }
376
377 IoTString *Table::getSpeculative(IoTString *key) {
378         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
379
380         if (kv == NULL) {
381                 kv = speculatedKeyValueTable->get(key);
382         }
383
384         if (kv == NULL) {
385                 kv = committedKeyValueTable->get(key);
386         }
387
388         if (kv != NULL) {
389                 return new IoTString(kv->getValue());
390         } else {
391                 return NULL;
392         }
393 }
394
395 IoTString *Table::getCommittedAtomic(IoTString *key) {
396         KeyValue *kv = committedKeyValueTable->get(key);
397
398         if (!arbitratorTable->contains(key)) {
399                 throw new Error("Key not Found.");
400         }
401
402         // Make sure new key value pair matches the current arbitrator
403         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
404                 // TODO: Maybe not throw en error
405                 throw new Error("Not all Key Values Match Arbitrator.");
406         }
407
408         if (kv != NULL) {
409                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
410                 return new IoTString(kv->getValue());
411         } else {
412                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
413                 return NULL;
414         }
415 }
416
417 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
418         if (!arbitratorTable->contains(key)) {
419                 throw new Error("Key not Found.");
420         }
421
422         // Make sure new key value pair matches the current arbitrator
423         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
424                 // TODO: Maybe not throw en error
425                 throw new Error("Not all Key Values Match Arbitrator.");
426         }
427
428         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
429
430         if (kv == NULL) {
431                 kv = speculatedKeyValueTable->get(key);
432         }
433
434         if (kv == NULL) {
435                 kv = committedKeyValueTable->get(key);
436         }
437
438         if (kv != NULL) {
439                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
440                 return new IoTString(kv->getValue());
441         } else {
442                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
443                 return NULL;
444         }
445 }
446
447 bool Table::update()  {
448         try {
449                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
450                 validateAndUpdate(newSlots, false);
451                 delete newSlots;
452                 sendToServer(NULL);
453                 updateLiveTransactionsAndStatus();
454                 return true;
455         } catch (Exception *e) {
456                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
457                 while (kit->hasNext()) {
458                         int64_t m = kit->next();
459                         updateFromLocal(m);
460                 }
461                 delete kit;
462         }
463
464         return false;
465 }
466
467 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
468         while (true) {
469                 if (arbitratorTable->contains(keyName)) {
470                         // There is already an arbitrator
471                         return false;
472                 }
473                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
474
475                 if (sendToServer(newKey)) {
476                         // If successfully inserted
477                         return true;
478                 }
479         }
480 }
481
482 void Table::startTransaction() {
483         // Create a new transaction, invalidates any old pending transactions.
484         if (pendingTransactionBuilder != NULL)
485                 delete pendingTransactionBuilder;
486         pendingTransactionBuilder = new PendingTransaction(localMachineId);
487 }
488
489 void Table::put(IoTString *key, IoTString *value) {
490         // Make sure it is a valid key
491         if (!arbitratorTable->contains(key)) {
492                 throw new Error("Key not Found.");
493         }
494
495         // Make sure new key value pair matches the current arbitrator
496         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
497                 // TODO: Maybe not throw en error
498                 throw new Error("Not all Key Values Match Arbitrator.");
499         }
500
501         // Add the key value to this transaction
502         KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
503         pendingTransactionBuilder->addKV(kv);
504 }
505
506 TransactionStatus *Table::commitTransaction() {
507         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
508                 // transaction with no updates will have no effect on the system
509                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
510         }
511
512         // Set the local transaction sequence number and increment
513         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
514         localTransactionSequenceNumber++;
515
516         // Create the transaction status
517         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
518
519         // Create the new transaction
520         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
521         newTransaction->setTransactionStatus(transactionStatus);
522
523         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
524                 // Add it to the queue and invalidate the builder for safety
525                 pendingTransactionQueue->add(newTransaction);
526         } else {
527                 arbitrateOnLocalTransaction(newTransaction);
528                 delete newTransaction;
529                 updateLiveStateFromLocal();
530         }
531         if (pendingTransactionBuilder != NULL)
532                 delete pendingTransactionBuilder;
533         
534         pendingTransactionBuilder = new PendingTransaction(localMachineId);
535
536         try {
537                 sendToServer(NULL);
538         } catch (ServerException *e) {
539
540                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
541                 uint size = pendingTransactionQueue->size();
542                 uint oldindex = 0;
543                 for (uint iter = 0; iter < size; iter++) {
544                         Transaction *transaction = pendingTransactionQueue->get(iter);
545                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
546
547                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
548                                 // Already contacted this client so ignore all attempts to contact this client
549                                 // to preserve ordering for arbitrator
550                                 continue;
551                         }
552
553                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
554
555                         if (sendReturn.getFirst()) {
556                                 // Failed to contact over local
557                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
558                         } else {
559                                 // Successful contact or should not contact
560
561                                 if (sendReturn.getSecond()) {
562                                         // did arbitrate
563                                         delete transaction;
564                                         oldindex--;
565                                 }
566                         }
567                 }
568                 pendingTransactionQueue->setSize(oldindex);
569         }
570
571         updateLiveStateFromLocal();
572
573         return transactionStatus;
574 }
575
576 /**
577  * Recalculate the new resize threshold
578  */
579 void Table::setResizeThreshold() {
580         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
581         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
582 }
583
584 int64_t Table::getLocalSequenceNumber() {
585         return localSequenceNumber;
586 }
587
588 NewKey * Table::handlePartialSend(NewKey * newKey) {
589         //Didn't receive acknowledgement for last send
590         //See if the server has received a newer slot
591         
592         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
593         if (newSlots->length() == 0) {
594                 //Retry sending old slot
595                 bool wasInserted = false;
596                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
597                 
598                 if (sendSlotsReturn) {
599                         if (newKey != NULL) {
600                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
601                                         newKey = NULL;
602                                 }
603                         }
604                         
605                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
606                         while (trit->hasNext()) {
607                                 Transaction *transaction = trit->next();
608                                 transaction->resetServerFailure();
609                                 // Update which transactions parts still need to be sent
610                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
611                                 // Add the transaction status to the outstanding list
612                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
613                                 
614                                 // Update the transaction status
615                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
616                                 
617                                 // Check if all the transaction parts were successfully
618                                 // sent and if so then remove it from pending
619                                 if (transaction->didSendAllParts()) {
620                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
621                                         pendingTransactionQueue->remove(transaction);
622                                 }
623                         }
624                         delete trit;
625                 } else {
626                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
627                                 if (newKey != NULL) {
628                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
629                                                 newKey = NULL;
630                                         }
631                                 }
632                                 
633                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
634                                 while (trit->hasNext()) {
635                                         Transaction *transaction = trit->next();
636                                         transaction->resetServerFailure();
637                                         
638                                         // Update which transactions parts still need to be sent
639                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
640                                         
641                                         // Add the transaction status to the outstanding list
642                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
643                                         
644                                         // Update the transaction status
645                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
646                                         
647                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
648                                         if (transaction->didSendAllParts()) {
649                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
650                                                 pendingTransactionQueue->remove(transaction);
651                                         } else {
652                                                 transaction->resetServerFailure();
653                                                 // Set the transaction sequence number back to nothing
654                                                 if (!transaction->didSendAPartToServer()) {
655                                                         transaction->setSequenceNumber(-1);
656                                                 }
657                                         }
658                                 }
659                                 delete trit;
660                         }
661                 }
662                 
663                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664                 while (trit->hasNext()) {
665                         Transaction *transaction = trit->next();
666                         transaction->resetServerFailure();
667                         // Set the transaction sequence number back to nothing
668                         if (!transaction->didSendAPartToServer()) {
669                                 transaction->setSequenceNumber(-1);
670                         }
671                 }
672                 delete trit;
673                 
674                 if (newSlots->length() != 0) {
675                         // insert into the local block chain
676                         validateAndUpdate(newSlots, true);
677                 }
678         } else {
679                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
680                         if (newKey != NULL) {
681                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
682                                         newKey = NULL;
683                                 }
684                         }
685                         
686                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
687                         while (trit->hasNext()) {
688                                 Transaction *transaction = trit->next();
689                                 transaction->resetServerFailure();
690                                 
691                                 // Update which transactions parts still need to be sent
692                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
693                                 
694                                 // Add the transaction status to the outstanding list
695                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
696                                 
697                                 // Update the transaction status
698                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
699                                 
700                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
701                                 if (transaction->didSendAllParts()) {
702                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
703                                         pendingTransactionQueue->remove(transaction);
704                                 } else {
705                                         transaction->resetServerFailure();
706                                         // Set the transaction sequence number back to nothing
707                                         if (!transaction->didSendAPartToServer()) {
708                                                 transaction->setSequenceNumber(-1);
709                                         }
710                                 }
711                         }
712                         delete trit;
713                 } else {
714                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
715                         while (trit->hasNext()) {
716                                 Transaction *transaction = trit->next();
717                                 transaction->resetServerFailure();
718                                 // Set the transaction sequence number back to nothing
719                                 if (!transaction->didSendAPartToServer()) {
720                                         transaction->setSequenceNumber(-1);
721                                 }
722                         }
723                         delete trit;
724                 }
725                 
726                 // insert into the local block chain
727                 validateAndUpdate(newSlots, true);
728         }
729         delete newSlots;
730         return newKey;
731 }
732
733 bool Table::sendToServer(NewKey *newKey) {
734         if (hadPartialSendToServer) {
735                 newKey = handlePartialSend(newKey);
736         }
737
738         try {
739                 // While we have stuff that needs inserting into the block chain
740                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
741                         if (hadPartialSendToServer) {
742                                 throw new Error("Should Be error free");
743                         }
744                         
745                         // If there is a new key with same name then end
746                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
747                                 return false;
748                         }
749
750                         // Create the slot
751                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
752                         localSequenceNumber++;
753
754                         // Try to fill the slot with data
755                         int newSize = 0;
756                         bool insertedNewKey = false;
757                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
758
759                         if (needsResize) {
760                                 // Reset which transaction to send
761                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
762                                 while (trit->hasNext()) {
763                                         Transaction *transaction = trit->next();
764                                         transaction->resetNextPartToSend();
765
766                                         // Set the transaction sequence number back to nothing
767                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
768                                                 transaction->setSequenceNumber(-1);
769                                         }
770                                 }
771                                 delete trit;
772
773                                 // Clear the sent data since we are trying again
774                                 pendingSendArbitrationEntriesToDelete->clear();
775                                 transactionPartsSent->clear();
776
777                                 // We needed a resize so try again
778                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
779                         }
780
781                         lastSlotAttemptedToSend = slot;
782                         lastIsNewKey = (newKey != NULL);
783                         lastInsertedNewKey = insertedNewKey;
784                         lastNewSize = newSize;
785                         lastNewKey = newKey;
786                         if (lastTransactionPartsSent != NULL)
787                                 delete lastTransactionPartsSent;
788                         lastTransactionPartsSent = transactionPartsSent->clone();
789                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
790
791                         Array<Slot *> * newSlots = NULL;
792                         bool wasInserted = false;
793                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
794
795                         if (sendSlotsReturn) {
796                                 // Did insert into the block chain
797                                 if (insertedNewKey) {
798                                         // This slot was what was inserted not a previous slot
799                                         // New Key was successfully inserted into the block chain so dont want to insert it again
800                                         newKey = NULL;
801                                 }
802
803                                 // Remove the aborts and commit parts that were sent from the pending to send queue
804                                 uint size = pendingSendArbitrationRounds->size();
805                                 uint oldcount = 0;
806                                 for (uint i = 0; i < size; i++) {
807                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
808                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
809
810                                         if (!round->isDoneSending()) {
811                                                 //Add part back in
812                                                 pendingSendArbitrationRounds->set(oldcount++,
813                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
814                                         } else
815                                                 delete pendingSendArbitrationRounds->get(i);
816                                 }
817                                 pendingSendArbitrationRounds->setSize(oldcount);
818
819                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
820                                 while (trit->hasNext()) {
821                                         Transaction *transaction = trit->next();
822                                         transaction->resetServerFailure();
823
824                                         // Update which transactions parts still need to be sent
825                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
826
827                                         // Add the transaction status to the outstanding list
828                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
829
830                                         // Update the transaction status
831                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
832
833                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
834                                         if (transaction->didSendAllParts()) {
835                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
836                                                 pendingTransactionQueue->remove(transaction);
837                                         }
838                                 }
839                                 delete trit;
840                         } else {
841                                 // Reset which transaction to send
842                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
843                                 while (trit->hasNext()) {
844                                         Transaction *transaction = trit->next();
845                                         transaction->resetNextPartToSend();
846
847                                         // Set the transaction sequence number back to nothing
848                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
849                                                 transaction->setSequenceNumber(-1);
850                                         }
851                                 }
852                                 delete trit;
853                         }
854
855                         // Clear the sent data in preparation for next send
856                         pendingSendArbitrationEntriesToDelete->clear();
857                         transactionPartsSent->clear();
858
859                         if (newSlots->length() != 0) {
860                                 // insert into the local block chain
861                                 validateAndUpdate(newSlots, true);
862                         }
863                         delete newSlots;
864                 }
865         } catch (ServerException *e) {
866                 if (e->getType() != ServerException_TypeInputTimeout) {
867                         // Nothing was able to be sent to the server so just clear these data structures
868                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
869                         while (trit->hasNext()) {
870                                 Transaction *transaction = trit->next();
871                                 transaction->resetNextPartToSend();
872
873                                 // Set the transaction sequence number back to nothing
874                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
875                                         transaction->setSequenceNumber(-1);
876                                 }
877                         }
878                         delete trit;
879                 } else {
880                         // There was a partial send to the server
881                         hadPartialSendToServer = true;
882
883                         // Nothing was able to be sent to the server so just clear these data structures
884                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
885                         while (trit->hasNext()) {
886                                 Transaction *transaction = trit->next();
887                                 transaction->resetNextPartToSend();
888                                 transaction->setServerFailure();
889                         }
890                         delete trit;
891                 }
892
893                 pendingSendArbitrationEntriesToDelete->clear();
894                 transactionPartsSent->clear();
895
896                 throw e;
897         }
898
899         return newKey == NULL;
900 }
901
902 bool Table::updateFromLocal(int64_t machineId) {
903         if (!localCommunicationTable->contains(machineId))
904                 return false;
905
906         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
907
908         // Get the size of the send data
909         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
910
911         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
912         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
913                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
914         }
915
916         Array<char> *sendData = new Array<char>(sendDataSize);
917         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
918
919         // Encode the data
920         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
921         bbEncode->putInt(0);
922
923         // Send by local
924         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
925         localSequenceNumber++;
926
927         if (returnData == NULL) {
928                 // Could not contact server
929                 return false;
930         }
931
932         // Decode the data
933         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
934         int numberOfEntries = bbDecode->getInt();
935
936         for (int i = 0; i < numberOfEntries; i++) {
937                 char type = bbDecode->get();
938                 if (type == TypeAbort) {
939                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
940                         processEntry(abort);
941                 } else if (type == TypeCommitPart) {
942                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
943                         processEntry(commitPart);
944                 }
945         }
946
947         updateLiveStateFromLocal();
948
949         return true;
950 }
951
952 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
953
954         // Get the devices local communications
955         if (!localCommunicationTable->contains(transaction->getArbitrator()))
956                 return Pair<bool, bool>(true, false);
957
958         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
959
960         // Get the size of the send data
961         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
962         {
963                 Vector<TransactionPart *> *tParts = transaction->getParts();
964                 uint tPartsSize = tParts->size();
965                 for (uint i = 0; i < tPartsSize; i++) {
966                         TransactionPart *part = tParts->get(i);
967                         sendDataSize += part->getSize();
968                 }
969         }
970
971         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
972         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
973                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
974         }
975
976         // Make the send data size
977         Array<char> *sendData = new Array<char>(sendDataSize);
978         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
979
980         // Encode the data
981         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
982         bbEncode->putInt(transaction->getParts()->size());
983         {
984                 Vector<TransactionPart *> *tParts = transaction->getParts();
985                 uint tPartsSize = tParts->size();
986                 for (uint i = 0; i < tPartsSize; i++) {
987                         TransactionPart *part = tParts->get(i);
988                         part->encode(bbEncode);
989                 }
990         }
991
992         // Send by local
993         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
994         localSequenceNumber++;
995
996         if (returnData == NULL) {
997                 // Could not contact server
998                 return Pair<bool, bool>(true, false);
999         }
1000
1001         // Decode the data
1002         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
1003         bool didCommit = bbDecode->get() == 1;
1004         bool couldArbitrate = bbDecode->get() == 1;
1005         int numberOfEntries = bbDecode->getInt();
1006         bool foundAbort = false;
1007
1008         for (int i = 0; i < numberOfEntries; i++) {
1009                 char type = bbDecode->get();
1010                 if (type == TypeAbort) {
1011                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
1012
1013                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
1014                                 foundAbort = true;
1015                         }
1016
1017                         processEntry(abort);
1018                 } else if (type == TypeCommitPart) {
1019                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1020                         processEntry(commitPart);
1021                 }
1022         }
1023
1024         updateLiveStateFromLocal();
1025
1026         if (couldArbitrate) {
1027                 TransactionStatus *status =  transaction->getTransactionStatus();
1028                 if (didCommit) {
1029                         status->setStatus(TransactionStatus_StatusCommitted);
1030                 } else {
1031                         status->setStatus(TransactionStatus_StatusAborted);
1032                 }
1033         } else {
1034                 TransactionStatus *status =  transaction->getTransactionStatus();
1035                 if (foundAbort) {
1036                         status->setStatus(TransactionStatus_StatusAborted);
1037                 } else {
1038                         status->setStatus(TransactionStatus_StatusCommitted);
1039                 }
1040         }
1041
1042         return Pair<bool, bool>(false, true);
1043 }
1044
1045 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1046         // Decode the data
1047         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1048         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1049         int numberOfParts = bbDecode->getInt();
1050
1051         // If we did commit a transaction or not
1052         bool didCommit = false;
1053         bool couldArbitrate = false;
1054
1055         if (numberOfParts != 0) {
1056
1057                 // decode the transaction
1058                 Transaction *transaction = new Transaction();
1059                 for (int i = 0; i < numberOfParts; i++) {
1060                         bbDecode->get();
1061                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1062                         transaction->addPartDecode(newPart);
1063                 }
1064
1065                 // Arbitrate on transaction and pull relevant return data
1066                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1067                 couldArbitrate = localArbitrateReturn.getFirst();
1068                 didCommit = localArbitrateReturn.getSecond();
1069
1070                 updateLiveStateFromLocal();
1071
1072                 // Transaction was sent to the server so keep track of it to prevent double commit
1073                 if (transaction->getSequenceNumber() != -1) {
1074                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1075                 }
1076         }
1077
1078         // The data to send back
1079         int returnDataSize = 0;
1080         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1081
1082         // Get the aborts to send back
1083         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1084         {
1085                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1086                 while (abortit->hasNext())
1087                         abortLocalSequenceNumbers->add(abortit->next());
1088                 delete abortit;
1089         }
1090
1091         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1092
1093         uint asize = abortLocalSequenceNumbers->size();
1094         for (uint i = 0; i < asize; i++) {
1095                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1096                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1097                         continue;
1098                 }
1099
1100                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1101                 unseenArbitrations->add(abort);
1102                 returnDataSize += abort->getSize();
1103         }
1104
1105         // Get the commits to send back
1106         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1107         if (commitForClientTable != NULL) {
1108                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1109                 {
1110                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1111                         while (commitit->hasNext())
1112                                 commitLocalSequenceNumbers->add(commitit->next());
1113                         delete commitit;
1114                 }
1115                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1116
1117                 uint clsSize = commitLocalSequenceNumbers->size();
1118                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1119                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1120                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1121
1122                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1123                                 continue;
1124                         }
1125
1126                         {
1127                                 Vector<CommitPart *> *parts = commit->getParts();
1128                                 uint nParts = parts->size();
1129                                 for (uint i = 0; i < nParts; i++) {
1130                                         CommitPart *commitPart = parts->get(i);
1131                                         unseenArbitrations->add(commitPart);
1132                                         returnDataSize += commitPart->getSize();
1133                                 }
1134                         }
1135                 }
1136         }
1137
1138         // Number of arbitration entries to decode
1139         returnDataSize += 2 * sizeof(int32_t);
1140
1141         // bool of did commit or not
1142         if (numberOfParts != 0) {
1143                 returnDataSize += sizeof(char);
1144         }
1145
1146         // Data to send Back
1147         Array<char> *returnData = new Array<char>(returnDataSize);
1148         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1149
1150         if (numberOfParts != 0) {
1151                 if (didCommit) {
1152                         bbEncode->put((char)1);
1153                 } else {
1154                         bbEncode->put((char)0);
1155                 }
1156                 if (couldArbitrate) {
1157                         bbEncode->put((char)1);
1158                 } else {
1159                         bbEncode->put((char)0);
1160                 }
1161         }
1162
1163         bbEncode->putInt(unseenArbitrations->size());
1164         uint size = unseenArbitrations->size();
1165         for (uint i = 0; i < size; i++) {
1166                 Entry *entry = unseenArbitrations->get(i);
1167                 entry->encode(bbEncode);
1168         }
1169
1170         localSequenceNumber++;
1171         return returnData;
1172 }
1173
1174 /** Checks whether a given slot was sent using new slots in
1175                 array. Returns true if sent and false otherwise.  */
1176
1177 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1178         uint size = array->length();
1179         for (uint i = 0; i < size; i++) {
1180                 Slot *s = array->get(i);
1181                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1182                         return true;
1183                 }
1184         }
1185         
1186         //Also need to see if other machines acknowledged our message
1187         for (uint i = 0; i < size; i++) {
1188                 Slot *s = array->get(i);
1189                 
1190                 // Process each entry in the slot
1191                 Vector<Entry *> *entries = s->getEntries();
1192                 uint eSize = entries->size();
1193                 for (uint ei = 0; ei < eSize; ei++) {
1194                         Entry *entry = entries->get(ei);
1195                         
1196                         if (entry->getType() == TypeLastMessage) {
1197                                 LastMessage *lastMessage = (LastMessage *)entry;
1198                                 
1199                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1200                                         return true;
1201                                 }
1202                         }
1203                 }
1204         }
1205         //Not found
1206         return false;
1207 }
1208
1209 /** Method tries to send slot to server.  Returns status in tuple.
1210                 isInserted returns whether last un-acked send (if any) was
1211                 successful.  Returns whether send was confirmed.x
1212  */
1213
1214 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1215         attemptedToSendToServer = true;
1216
1217         *array = cloud->putSlot(slot, newSize);
1218         if (*array == NULL) {
1219                 *array = new Array<Slot *>(1);
1220                 (*array)->set(0, slot);
1221                 rejectedSlotVector->clear();
1222                 *isInserted = false;
1223                 return true;
1224         } else {
1225                 if ((*array)->length() == 0) {
1226                         throw new Error("Server Error: Did not send any slots");
1227                 }
1228
1229                 if (hadPartialSendToServer) {
1230                         *isInserted = checkSend(*array, slot);
1231
1232                         if (!(*isInserted)) {
1233                                 rejectedSlotVector->add(slot->getSequenceNumber());
1234                         }
1235                         
1236                         return false;
1237                 } else {
1238                         rejectedSlotVector->add(slot->getSequenceNumber());
1239                         *isInserted = false;
1240                         return false;
1241                 }
1242         }
1243 }
1244
1245 /**
1246  * Returns true if a resize was needed but not done.
1247  */
1248 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1249         newSize = 0;//special value to indicate no resize
1250         if (liveSlotCount > bufferResizeThreshold) {
1251                 resize = true;//Resize is forced
1252         }
1253
1254         if (resize) {
1255                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1256                 TableStatus *status = new TableStatus(slot, newSize);
1257                 slot->addEntry(status);
1258         }
1259
1260         // Fill with rejected slots first before doing anything else
1261         doRejectedMessages(slot);
1262
1263         // Do mandatory rescue of entries
1264         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1265
1266         // Extract working variables
1267         bool needsResize = mandatoryRescueReturn.getFirst();
1268         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1269         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1270
1271         if (needsResize && !resize) {
1272                 // We need to resize but we are not resizing so return true to force on retry
1273                 return true;
1274         }
1275
1276         insertedKey = false;
1277         if (newKeyEntry != NULL) {
1278                 newKeyEntry->setSlot(slot);
1279                 if (slot->hasSpace(newKeyEntry)) {
1280                         slot->addEntry(newKeyEntry);
1281                         insertedKey = true;
1282                 }
1283         }
1284
1285         // Clear the transactions, aborts and commits that were sent previously
1286         transactionPartsSent->clear();
1287         pendingSendArbitrationEntriesToDelete->clear();
1288         uint size = pendingSendArbitrationRounds->size();
1289         for (uint i = 0; i < size; i++) {
1290                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1291                 bool isFull = false;
1292                 round->generateParts();
1293                 Vector<Entry *> *parts = round->getParts();
1294
1295                 // Insert pending arbitration data
1296                 uint vsize = parts->size();
1297                 for (uint vi = 0; vi < vsize; vi++) {
1298                         Entry *arbitrationData = parts->get(vi);
1299
1300                         // If it is an abort then we need to set some information
1301                         if (arbitrationData->getType() == TypeAbort) {
1302                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1303                         }
1304
1305                         if (!slot->hasSpace(arbitrationData)) {
1306                                 // No space so cant do anything else with these data entries
1307                                 isFull = true;
1308                                 break;
1309                         }
1310
1311                         // Add to this current slot and add it to entries to delete
1312                         slot->addEntry(arbitrationData);
1313                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1314                 }
1315
1316                 if (isFull) {
1317                         break;
1318                 }
1319         }
1320
1321         if (pendingTransactionQueue->size() > 0) {
1322                 Transaction *transaction = pendingTransactionQueue->get(0);
1323                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1324                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1325                         transaction->setSequenceNumber(slot->getSequenceNumber());
1326                 }
1327
1328                 while (true) {
1329                         TransactionPart *part = transaction->getNextPartToSend();
1330                         if (part == NULL) {
1331                                 // Ran out of parts to send for this transaction so move on
1332                                 break;
1333                         }
1334
1335                         if (slot->hasSpace(part)) {
1336                                 slot->addEntry(part);
1337                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1338                                 if (partsSent == NULL) {
1339                                         partsSent = new Vector<int32_t>();
1340                                         transactionPartsSent->put(transaction, partsSent);
1341                                 }
1342                                 partsSent->add(part->getPartNumber());
1343                                 transactionPartsSent->put(transaction, partsSent);
1344                         } else {
1345                                 break;
1346                         }
1347                 }
1348         }
1349
1350         // Fill the remainder of the slot with rescue data
1351         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1352
1353         return false;
1354 }
1355
1356 void Table::doRejectedMessages(Slot *s) {
1357         if (!rejectedSlotVector->isEmpty()) {
1358                 /* TODO: We should avoid generating a rejected message entry if
1359                  * there is already a sufficient entry in the queue (e->g->,
1360                  * equalsto value of true and same sequence number)->  */
1361
1362                 int64_t old_seqn = rejectedSlotVector->get(0);
1363                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1364                         int64_t new_seqn = rejectedSlotVector->lastElement();
1365                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1366                         s->addEntry(rm);
1367                 } else {
1368                         int64_t prev_seqn = -1;
1369                         uint i = 0;
1370                         /* Go through list of missing messages */
1371                         for (; i < rejectedSlotVector->size(); i++) {
1372                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1373                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1374                                 if (s_msg != NULL)
1375                                         break;
1376                                 prev_seqn = curr_seqn;
1377                         }
1378                         /* Generate rejected message entry for missing messages */
1379                         if (prev_seqn != -1) {
1380                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1381                                 s->addEntry(rm);
1382                         }
1383                         /* Generate rejected message entries for present messages */
1384                         for (; i < rejectedSlotVector->size(); i++) {
1385                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1386                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1387                                 int64_t machineid = s_msg->getMachineID();
1388                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1389                                 s->addEntry(rm);
1390                         }
1391                 }
1392         }
1393 }
1394
1395 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1396         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1397         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1398         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1399                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1400         }
1401
1402         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1403         bool seenLiveSlot = false;
1404         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1405         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1406
1407
1408         // Mandatory Rescue
1409         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1410                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1411                 // Push slot number forward
1412                 if (!seenLiveSlot) {
1413                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1414                 }
1415
1416                 if (!previousSlot->isLive()) {
1417                         continue;
1418                 }
1419
1420                 // We have seen a live slot
1421                 seenLiveSlot = true;
1422
1423                 // Get all the live entries for a slot
1424                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1425
1426                 // Iterate over all the live entries and try to rescue them
1427                 uint lESize = liveEntries->size();
1428                 for (uint i = 0; i < lESize; i++) {
1429                         Entry *liveEntry = liveEntries->get(i);
1430                         if (slot->hasSpace(liveEntry)) {
1431                                 // Enough space to rescue the entry
1432                                 slot->addEntry(liveEntry);
1433                         } else if (currentSequenceNumber == firstIfFull) {
1434                                 //if there's no space but the entry is about to fall off the queue
1435                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1436                         }
1437                 }
1438         }
1439
1440         // Did not resize
1441         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1442 }
1443
1444 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1445         /* now go through live entries from least to greatest sequence number until
1446          * either all live slots added, or the slot doesn't have enough room
1447          * for SKIP_THRESHOLD consecutive entries*/
1448         int skipcount = 0;
1449         int64_t newestseqnum = buffer->getNewestSeqNum();
1450         for (; seqn <= newestseqnum; seqn++) {
1451                 Slot *prevslot = buffer->getSlot(seqn);
1452                 //Push slot number forward
1453                 if (!seenliveslot)
1454                         oldestLiveSlotSequenceNumver = seqn;
1455
1456                 if (!prevslot->isLive())
1457                         continue;
1458                 seenliveslot = true;
1459                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1460                 uint lESize = liveentries->size();
1461                 for (uint i = 0; i < lESize; i++) {
1462                         Entry *liveentry = liveentries->get(i);
1463                         if (s->hasSpace(liveentry))
1464                                 s->addEntry(liveentry);
1465                         else {
1466                                 skipcount++;
1467                                 if (skipcount > Table_SKIP_THRESHOLD) {
1468                                         delete liveentries;
1469                                         goto donesearch;
1470                                 }
1471                         }
1472                 }
1473                 delete liveentries;
1474         }
1475 donesearch:
1476         ;
1477 }
1478
1479 /**
1480  * Checks for malicious activity and updates the local copy of the block chain->
1481  */
1482 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1483         // The cloud communication layer has checked slot HMACs already
1484         // before decoding
1485         if (newSlots->length() == 0) {
1486                 return;
1487         }
1488
1489         // Make sure all slots are newer than the last largest slot this
1490         // client has seen
1491         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1492         if (firstSeqNum <= sequenceNumber) {
1493                 throw new Error("Server Error: Sent older slots!");
1494         }
1495
1496         // Create an object that can access both new slots and slots in our
1497         // local chain without committing slots to our local chain
1498         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1499
1500         // Check that the HMAC chain is not broken
1501         checkHMACChain(indexer, newSlots);
1502
1503         // Set to keep track of messages from clients
1504         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1505         {
1506                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1507                 while (lmit->hasNext())
1508                         machineSet->add(lmit->next());
1509                 delete lmit;
1510         }
1511
1512         // Process each slots data
1513         {
1514                 uint numSlots = newSlots->length();
1515                 for (uint i = 0; i < numSlots; i++) {
1516                         Slot *slot = newSlots->get(i);
1517                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1518                         updateExpectedSize();
1519                 }
1520         }
1521         delete indexer;
1522         
1523         // If there is a gap, check to see if the server sent us
1524         // everything->
1525         if (firstSeqNum != (sequenceNumber + 1)) {
1526
1527                 // Check the size of the slots that were sent down by the server->
1528                 // Can only check the size if there was a gap
1529                 checkNumSlots(newSlots->length());
1530
1531                 // Since there was a gap every machine must have pushed a slot or
1532                 // must have a last message message-> If not then the server is
1533                 // hiding slots
1534                 if (!machineSet->isEmpty()) {
1535                         delete machineSet;
1536                         throw new Error("Missing record for machines: ");
1537                 }
1538         }
1539         delete machineSet;
1540         // Update the size of our local block chain->
1541         commitNewMaxSize();
1542
1543         // Commit new to slots to the local block chain->
1544         {
1545                 uint numSlots = newSlots->length();
1546                 for (uint i = 0; i < numSlots; i++) {
1547                         Slot *slot = newSlots->get(i);
1548
1549                         // Insert this slot into our local block chain copy->
1550                         buffer->putSlot(slot);
1551
1552                         // Keep track of how many slots are currently live (have live data
1553                         // in them)->
1554                         liveSlotCount++;
1555                 }
1556         }
1557         // Get the sequence number of the latest slot in the system
1558         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1559         updateLiveStateFromServer();
1560
1561         // No Need to remember after we pulled from the server
1562         offlineTransactionsCommittedAndAtServer->clear();
1563
1564         // This is invalidated now
1565         hadPartialSendToServer = false;
1566 }
1567
1568 void Table::updateLiveStateFromServer() {
1569         // Process the new transaction parts
1570         processNewTransactionParts();
1571
1572         // Do arbitration on new transactions that were received
1573         arbitrateFromServer();
1574
1575         // Update all the committed keys
1576         bool didCommitOrSpeculate = updateCommittedTable();
1577
1578         // Delete the transactions that are now dead
1579         updateLiveTransactionsAndStatus();
1580
1581         // Do speculations
1582         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1583         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1584 }
1585
1586 void Table::updateLiveStateFromLocal() {
1587         // Update all the committed keys
1588         bool didCommitOrSpeculate = updateCommittedTable();
1589
1590         // Delete the transactions that are now dead
1591         updateLiveTransactionsAndStatus();
1592
1593         // Do speculations
1594         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1595         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1596 }
1597
1598 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1599         int64_t prevslots = firstSequenceNumber;
1600
1601         if (didFindTableStatus) {
1602         } else {
1603                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1604         }
1605
1606         didFindTableStatus = true;
1607         currMaxSize = numberOfSlots;
1608 }
1609
1610 void Table::updateExpectedSize() {
1611         expectedsize++;
1612
1613         if (expectedsize > currMaxSize) {
1614                 expectedsize = currMaxSize;
1615         }
1616 }
1617
1618
1619 /**
1620  * Check the size of the block chain to make sure there are enough
1621  * slots sent back by the server-> This is only called when we have a
1622  * gap between the slots that we have locally and the slots sent by
1623  * the server therefore in the slots sent by the server there will be
1624  * at least 1 Table status message
1625  */
1626 void Table::checkNumSlots(int numberOfSlots) {
1627         if (numberOfSlots != expectedsize) {
1628                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1629         }
1630 }
1631
1632 /**
1633  * Update the size of of the local buffer if it is needed->
1634  */
1635 void Table::commitNewMaxSize() {
1636         didFindTableStatus = false;
1637
1638         // Resize the local slot buffer
1639         if (numberOfSlots != currMaxSize) {
1640                 buffer->resize((int32_t)currMaxSize);
1641         }
1642
1643         // Change the number of local slots to the new size
1644         numberOfSlots = (int32_t)currMaxSize;
1645
1646         // Recalculate the resize threshold since the size of the local
1647         // buffer has changed
1648         setResizeThreshold();
1649 }
1650
1651 /**
1652  * Process the new transaction parts from this latest round of slots
1653  * received from the server
1654  */
1655 void Table::processNewTransactionParts() {
1656
1657         if (newTransactionParts->size() == 0) {
1658                 // Nothing new to process
1659                 return;
1660         }
1661
1662         // Iterate through all the machine Ids that we received new parts
1663         // for
1664         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1665         while (tpit->hasNext()) {
1666                 int64_t machineId = tpit->next();
1667                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1668
1669                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1670                 // Iterate through all the parts for that machine Id
1671                 while (ptit->hasNext()) {
1672                         Pair<int64_t, int32_t> *partId = ptit->next();
1673                         TransactionPart *part = parts->get(partId);
1674
1675                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1676                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1677                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1678                                         // Set dead the transaction part
1679                                         part->setDead();
1680                                         continue;
1681                                 }
1682                         }
1683
1684                         // Get the transaction object for that sequence number
1685                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1686
1687                         if (transaction == NULL) {
1688                                 // This is a new transaction that we dont have so make a new one
1689                                 transaction = new Transaction();
1690                                 
1691                                 // Add that part to the transaction
1692                                 transaction->addPartDecode(part);
1693
1694                                 // Insert this new transaction into the live tables
1695                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1696                                 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1697                         }
1698                 }
1699                 delete ptit;
1700         }
1701         delete tpit;
1702         // Clear all the new transaction parts in preparation for the next
1703         // time the server sends slots
1704         {
1705                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1706                 while (partsit->hasNext()) {
1707                         int64_t machineId = partsit->next();
1708                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1709                         delete parts;
1710                 }
1711                 delete partsit;
1712                 newTransactionParts->clear();
1713         }
1714 }
1715
1716 void Table::arbitrateFromServer() {
1717         if (liveTransactionBySequenceNumberTable->size() == 0) {
1718                 // Nothing to arbitrate on so move on
1719                 return;
1720         }
1721
1722         // Get the transaction sequence numbers and sort from oldest to newest
1723         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1724         {
1725                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1726                 while (trit->hasNext())
1727                         transactionSequenceNumbers->add(trit->next());
1728                 delete trit;
1729         }
1730         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1731
1732         // Collection of key value pairs that are
1733         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1734
1735         // The last transaction arbitrated on
1736         int64_t lastTransactionCommitted = -1;
1737         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1738         uint tsnSize = transactionSequenceNumbers->size();
1739         for (uint i = 0; i < tsnSize; i++) {
1740                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1741                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1742
1743                 // Check if this machine arbitrates for this transaction if not
1744                 // then we cant arbitrate this transaction
1745                 if (transaction->getArbitrator() != localMachineId) {
1746                         continue;
1747                 }
1748
1749                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1750                         continue;
1751                 }
1752
1753                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1754                         // We have seen this already locally so dont commit again
1755                         continue;
1756                 }
1757
1758                 if (!transaction->isComplete()) {
1759                         // Will arbitrate in incorrect order if we continue so just break
1760                         // Most likely this
1761                         break;
1762                 }
1763
1764                 // update the largest transaction seen by arbitrator from server
1765                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1766                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1767                 } else {
1768                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1769                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1770                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1771                         }
1772                 }
1773
1774                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1775                         // Guard evaluated as true
1776                         // Update the local changes so we can make the commit
1777                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1778                         while (kvit->hasNext()) {
1779                                 KeyValue *kv = kvit->next();
1780                                 speculativeTableTmp->put(kv->getKey(), kv);
1781                         }
1782                         delete kvit;
1783
1784                         // Update what the last transaction committed was for use in batch commit
1785                         lastTransactionCommitted = transactionSequenceNumber;
1786                 } else {
1787                         // Guard evaluated was false so create abort
1788                         // Create the abort
1789                         Abort *newAbort = new Abort(NULL,
1790                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1791                                                                                                                                         transaction->getSequenceNumber(),
1792                                                                                                                                         transaction->getMachineId(),
1793                                                                                                                                         transaction->getArbitrator(),
1794                                                                                                                                         localArbitrationSequenceNumber);
1795                         localArbitrationSequenceNumber++;
1796                         generatedAborts->add(newAbort);
1797
1798                         // Insert the abort so we can process
1799                         processEntry(newAbort);
1800                 }
1801
1802                 lastSeqNumArbOn = transactionSequenceNumber;
1803         }
1804
1805         Commit *newCommit = NULL;
1806
1807         // If there is something to commit
1808         if (speculativeTableTmp->size() != 0) {
1809                 // Create the commit and increment the commit sequence number
1810                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1811                 localArbitrationSequenceNumber++;
1812
1813                 // Add all the new keys to the commit
1814                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1815                 while (spit->hasNext()) {
1816                         IoTString *string = spit->next();
1817                         KeyValue *kv = speculativeTableTmp->get(string);
1818                         newCommit->addKV(kv);
1819                 }
1820                 delete spit;
1821                 
1822                 // create the commit parts
1823                 newCommit->createCommitParts();
1824
1825                 // Append all the commit parts to the end of the pending queue
1826                 // waiting for sending to the server
1827                 // Insert the commit so we can process it
1828                 Vector<CommitPart *> *parts = newCommit->getParts();
1829                 uint partsSize = parts->size();
1830                 for (uint i = 0; i < partsSize; i++) {
1831                         CommitPart *commitPart = parts->get(i);
1832                         processEntry(commitPart);
1833                 }
1834         }
1835         delete speculativeTableTmp;
1836
1837         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1838                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1839                 pendingSendArbitrationRounds->add(arbitrationRound);
1840
1841                 if (compactArbitrationData()) {
1842                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1843                         if (newArbitrationRound->getCommit() != NULL) {
1844                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1845                                 uint partsSize = parts->size();
1846                                 for (uint i = 0; i < partsSize; i++) {
1847                                         CommitPart *commitPart = parts->get(i);
1848                                         processEntry(commitPart);
1849                                 }
1850                         }
1851                 }
1852         }
1853 }
1854
1855 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1856
1857         // Check if this machine arbitrates for this transaction if not then
1858         // we cant arbitrate this transaction
1859         if (transaction->getArbitrator() != localMachineId) {
1860                 return Pair<bool, bool>(false, false);
1861         }
1862
1863         if (!transaction->isComplete()) {
1864                 // Will arbitrate in incorrect order if we continue so just break
1865                 // Most likely this
1866                 return Pair<bool, bool>(false, false);
1867         }
1868
1869         if (transaction->getMachineId() != localMachineId) {
1870                 // dont do this check for local transactions
1871                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1872                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1873                                 // We've have already seen this from the server
1874                                 return Pair<bool, bool>(false, false);
1875                         }
1876                 }
1877         }
1878
1879         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1880                 // Guard evaluated as true Create the commit and increment the
1881                 // commit sequence number
1882                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1883                 localArbitrationSequenceNumber++;
1884
1885                 // Update the local changes so we can make the commit
1886                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1887                 while (kvit->hasNext()) {
1888                         KeyValue *kv = kvit->next();
1889                         newCommit->addKV(kv);
1890                 }
1891                 delete kvit;
1892
1893                 // create the commit parts
1894                 newCommit->createCommitParts();
1895
1896                 // Append all the commit parts to the end of the pending queue
1897                 // waiting for sending to the server
1898                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1899                 pendingSendArbitrationRounds->add(arbitrationRound);
1900
1901                 if (compactArbitrationData()) {
1902                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1903                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1904                         uint partsSize = parts->size();
1905                         for (uint i = 0; i < partsSize; i++) {
1906                                 CommitPart *commitPart = parts->get(i);
1907                                 processEntry(commitPart);
1908                         }
1909                 } else {
1910                         // Insert the commit so we can process it
1911                         Vector<CommitPart *> *parts = newCommit->getParts();
1912                         uint partsSize = parts->size();
1913                         for (uint i = 0; i < partsSize; i++) {
1914                                 CommitPart *commitPart = parts->get(i);
1915                                 processEntry(commitPart);
1916                         }
1917                 }
1918
1919                 if (transaction->getMachineId() == localMachineId) {
1920                         TransactionStatus *status = transaction->getTransactionStatus();
1921                         if (status != NULL) {
1922                                 status->setStatus(TransactionStatus_StatusCommitted);
1923                         }
1924                 }
1925
1926                 updateLiveStateFromLocal();
1927                 return Pair<bool, bool>(true, true);
1928         } else {
1929                 if (transaction->getMachineId() == localMachineId) {
1930                         // For locally created messages update the status
1931                         // Guard evaluated was false so create abort
1932                         TransactionStatus *status = transaction->getTransactionStatus();
1933                         if (status != NULL) {
1934                                 status->setStatus(TransactionStatus_StatusAborted);
1935                         }
1936                 } else {
1937                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1938
1939                         // Create the abort
1940                         Abort *newAbort = new Abort(NULL,
1941                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1942                                                                                                                                         -1,
1943                                                                                                                                         transaction->getMachineId(),
1944                                                                                                                                         transaction->getArbitrator(),
1945                                                                                                                                         localArbitrationSequenceNumber);
1946                         localArbitrationSequenceNumber++;
1947                         addAbortSet->add(newAbort);
1948
1949                         // Append all the commit parts to the end of the pending queue
1950                         // waiting for sending to the server
1951                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1952                         pendingSendArbitrationRounds->add(arbitrationRound);
1953
1954                         if (compactArbitrationData()) {
1955                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1956
1957                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1958                                 uint partsSize = parts->size();
1959                                 for (uint i = 0; i < partsSize; i++) {
1960                                         CommitPart *commitPart = parts->get(i);
1961                                         processEntry(commitPart);
1962                                 }
1963                         }
1964                 }
1965
1966                 updateLiveStateFromLocal();
1967                 return Pair<bool, bool>(true, false);
1968         }
1969 }
1970
1971 /**
1972  * Compacts the arbitration data by merging commits and aggregating
1973  * aborts so that a single large push of commits can be done instead
1974  * of many small updates
1975  */
1976 bool Table::compactArbitrationData() {
1977         if (pendingSendArbitrationRounds->size() < 2) {
1978                 // Nothing to compact so do nothing
1979                 return false;
1980         }
1981
1982         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1983         if (lastRound->getDidSendPart()) {
1984                 return false;
1985         }
1986
1987         bool hadCommit = (lastRound->getCommit() == NULL);
1988         bool gotNewCommit = false;
1989
1990         uint numberToDelete = 1;
1991         
1992         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1993                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1994
1995                 if (round->isFull() || round->getDidSendPart()) {
1996                         // Stop since there is a part that cannot be compacted and we
1997                         // need to compact in order
1998                         break;
1999                 }
2000
2001                 if (round->getCommit() == NULL) {
2002                         // Try compacting aborts only
2003                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
2004                         if (newSize > ArbitrationRound_MAX_PARTS) {
2005                                 // Cant compact since it would be too large
2006                                 break;
2007                         }
2008                         lastRound->addAborts(round->getAborts());
2009                 } else {
2010                         // Create a new larger commit
2011                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2012                         localArbitrationSequenceNumber++;
2013
2014                         // Create the commit parts so that we can count them
2015                         newCommit->createCommitParts();
2016
2017                         // Calculate the new size of the parts
2018                         int newSize = newCommit->getNumberOfParts();
2019                         newSize += lastRound->getAbortsCount();
2020                         newSize += round->getAbortsCount();
2021
2022                         if (newSize > ArbitrationRound_MAX_PARTS) {
2023                                 // Can't compact since it would be too large
2024                                 if (lastRound->getCommit() != newCommit &&
2025                                                 round->getCommit() != newCommit)
2026                                         delete newCommit;
2027                                 break;
2028                         }
2029                         // Set the new compacted part
2030                         if (lastRound->getCommit() == newCommit)
2031                                 lastRound->setCommit(NULL);
2032                         if (round->getCommit() == newCommit)
2033                                 round->setCommit(NULL);
2034                         
2035                         if (lastRound->getCommit() != NULL) {
2036                                 Commit * oldcommit = lastRound->getCommit();
2037                                 lastRound->setCommit(NULL);
2038                                 delete oldcommit;
2039                         }
2040                         lastRound->setCommit(newCommit);
2041                         lastRound->addAborts(round->getAborts());
2042                         gotNewCommit = true;
2043                 }
2044
2045                 numberToDelete++;
2046         }
2047
2048         if (numberToDelete != 1) {
2049                 // If there is a compaction
2050                 // Delete the previous pieces that are now in the new compacted piece
2051                 for (uint i = 2; i <= numberToDelete; i++) {
2052                         delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2053                 }
2054                 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2055
2056                 pendingSendArbitrationRounds->add(lastRound);
2057
2058                 // Should reinsert into the commit processor
2059                 if (hadCommit && gotNewCommit) {
2060                         return true;
2061                 }
2062         }
2063
2064         return false;
2065 }
2066
2067 /**
2068  * Update all the commits and the committed tables, sets dead the dead
2069  * transactions
2070  */
2071 bool Table::updateCommittedTable() {
2072         if (newCommitParts->size() == 0) {
2073                 // Nothing new to process
2074                 return false;
2075         }
2076
2077         // Iterate through all the machine Ids that we received new parts for
2078         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2079         while (partsit->hasNext()) {
2080                 int64_t machineId = partsit->next();
2081                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2082
2083                 // Iterate through all the parts for that machine Id
2084                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2085                 while (pairit->hasNext()) {
2086                         Pair<int64_t, int32_t> *partId = pairit->next();
2087                         CommitPart *part = parts->get(partId);
2088
2089                         // Get the transaction object for that sequence number
2090                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2091
2092                         if (commitForClientTable == NULL) {
2093                                 // This is the first commit from this device
2094                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2095                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2096                         }
2097
2098                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2099
2100                         if (commit == NULL) {
2101                                 // This is a new commit that we dont have so make a new one
2102                                 commit = new Commit();
2103
2104                                 // Insert this new commit into the live tables
2105                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2106                         }
2107
2108                         // Add that part to the commit
2109                         commit->addPartDecode(part);
2110                 }
2111                 delete pairit;
2112                 delete parts;
2113         }
2114         delete partsit;
2115
2116         // Clear all the new commits parts in preparation for the next time
2117         // the server sends slots
2118         newCommitParts->clear();
2119
2120         // If we process a new commit keep track of it for future use
2121         bool didProcessANewCommit = false;
2122
2123         // Process the commits one by one
2124         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2125         while (liveit->hasNext()) {
2126                 int64_t arbitratorId = liveit->next();
2127                 // Get all the commits for a specific arbitrator
2128                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2129
2130                 // Sort the commits in order
2131                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2132                 {
2133                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2134                         while (clientit->hasNext())
2135                                 commitSequenceNumbers->add(clientit->next());
2136                         delete clientit;
2137                 }
2138
2139                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2140
2141                 // Get the last commit seen from this arbitrator
2142                 int64_t lastCommitSeenSequenceNumber = -1;
2143                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2144                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2145                 }
2146
2147                 // Go through each new commit one by one
2148                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2149                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2150                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2151                         // Special processing if a commit is not complete
2152                         if (!commit->isComplete()) {
2153                                 if (i == (commitSequenceNumbers->size() - 1)) {
2154                                         // If there is an incomplete commit and this commit is the
2155                                         // latest one seen then this commit cannot be processed and
2156                                         // there are no other commits
2157                                         break;
2158                                 } else {
2159                                         // This is a commit that was already dead but parts of it
2160                                         // are still in the block chain (not flushed out yet)->
2161                                         // Delete it and move on
2162                                         commit->setDead();
2163                                         commitForClientTable->remove(commit->getSequenceNumber());
2164                                         delete commit;
2165                                         continue;
2166                                 }
2167                         }
2168
2169                         // Update the last transaction that was updated if we can
2170                         if (commit->getTransactionSequenceNumber() != -1) {
2171                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2172                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2173                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2174                                 }
2175                         }
2176
2177                         // Update the last arbitration data that we have seen so far
2178                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2179                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2180                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2181                                         // Is larger
2182                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2183                                 }
2184                         } else {
2185                                 // Never seen any data from this arbitrator so record the first one
2186                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2187                         }
2188
2189                         // We have already seen this commit before so need to do the
2190                         // full processing on this commit
2191                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2192                                 // Update the last transaction that was updated if we can
2193                                 if (commit->getTransactionSequenceNumber() != -1) {
2194                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2195                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2196                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2197                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2198                                         }
2199                                 }
2200                                 continue;
2201                         }
2202
2203                         // If we got here then this is a brand new commit and needs full
2204                         // processing
2205                         // Get what commits should be edited, these are the commits that
2206                         // have live values for their keys
2207                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2208                         {
2209                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2210                                 while (kvit->hasNext()) {
2211                                         KeyValue *kv = kvit->next();
2212                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2213                                         if (commit != NULL)
2214                                                 commitsToEdit->add(commit);
2215                                 }
2216                                 delete kvit;
2217                         }
2218
2219                         // Update each previous commit that needs to be updated
2220                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2221                         while (commitit->hasNext()) {
2222                                 Commit *previousCommit = commitit->next();
2223
2224                                 // Only bother with live commits (TODO: Maybe remove this check)
2225                                 if (previousCommit->isLive()) {
2226
2227                                         // Update which keys in the old commits are still live
2228                                         {
2229                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2230                                                 while (kvit->hasNext()) {
2231                                                         KeyValue *kv = kvit->next();
2232                                                         previousCommit->invalidateKey(kv->getKey());
2233                                                 }
2234                                                 delete kvit;
2235                                         }
2236
2237                                         // if the commit is now dead then remove it
2238                                         if (!previousCommit->isLive()) {
2239                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2240                                                 delete previousCommit;
2241                                         }
2242                                 }
2243                         }
2244                         delete commitit;
2245                         delete commitsToEdit;
2246
2247                         // Update the last seen sequence number from this arbitrator
2248                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2249                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2250                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2251                                 }
2252                         } else {
2253                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2254                         }
2255
2256                         // We processed a new commit that we havent seen before
2257                         didProcessANewCommit = true;
2258
2259                         // Update the committed table of keys and which commit is using which key
2260                         {
2261                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2262                                 while (kvit->hasNext()) {
2263                                         KeyValue *kv = kvit->next();
2264                                         committedKeyValueTable->put(kv->getKey(), kv);
2265                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2266                                 }
2267                                 delete kvit;
2268                         }
2269                 }
2270         }
2271         delete liveit;
2272
2273         return didProcessANewCommit;
2274 }
2275
2276 /**
2277  * Create the speculative table from transactions that are still live
2278  * and have come from the cloud
2279  */
2280 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2281         if (liveTransactionBySequenceNumberTable->size() == 0) {
2282                 // There is nothing to speculate on
2283                 return false;
2284         }
2285
2286         // Create a list of the transaction sequence numbers and sort them
2287         // from oldest to newest
2288         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2289         {
2290                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2291                 while (trit->hasNext())
2292                         transactionSequenceNumbersSorted->add(trit->next());
2293                 delete trit;
2294         }
2295
2296         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2297
2298         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2299
2300
2301         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2302                 // If there is a gap in the transaction sequence numbers then
2303                 // there was a commit or an abort of a transaction OR there was a
2304                 // new commit (Could be from offline commit) so a redo the
2305                 // speculation from scratch
2306
2307                 // Start from scratch
2308                 speculatedKeyValueTable->clear();
2309                 lastTransactionSequenceNumberSpeculatedOn = -1;
2310                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2311         }
2312
2313         // Remember the front of the transaction list
2314         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2315
2316         // Find where to start arbitration from
2317         uint startIndex = 0;
2318
2319         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2320                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2321                         break;
2322         startIndex++;
2323
2324         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2325                 // Make sure we are not out of bounds
2326                 return false;           // did not speculate
2327         }
2328
2329         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2330         bool didSkip = true;
2331
2332         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2333                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2334                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2335
2336                 if (!transaction->isComplete()) {
2337                         // If there is an incomplete transaction then there is nothing
2338                         // we can do add this transactions arbitrator to the list of
2339                         // arbitrators we should ignore
2340                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2341                         didSkip = true;
2342                         continue;
2343                 }
2344
2345                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2346                         continue;
2347                 }
2348
2349                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2350
2351                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2352                         // Guard evaluated to true so update the speculative table
2353                         {
2354                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2355                                 while (kvit->hasNext()) {
2356                                         KeyValue *kv = kvit->next();
2357                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2358                                 }
2359                                 delete kvit;
2360                         }
2361                 }
2362         }
2363
2364         if (didSkip) {
2365                 // Since there was a skip we need to redo the speculation next time around
2366                 lastTransactionSequenceNumberSpeculatedOn = -1;
2367                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2368         }
2369
2370         // We did some speculation
2371         return true;
2372 }
2373
2374 /**
2375  * Create the pending transaction speculative table from transactions
2376  * that are still in the pending transaction buffer
2377  */
2378 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2379         if (pendingTransactionQueue->size() == 0) {
2380                 // There is nothing to speculate on
2381                 return;
2382         }
2383
2384         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2385                 // need to reset on the pending speculation
2386                 lastPendingTransactionSpeculatedOn = NULL;
2387                 firstPendingTransaction = pendingTransactionQueue->get(0);
2388                 pendingTransactionSpeculatedKeyValueTable->clear();
2389         }
2390
2391         // Find where to start arbitration from
2392         uint startIndex = 0;
2393
2394         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2395                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2396                         break;
2397
2398         if (startIndex >= pendingTransactionQueue->size()) {
2399                 // Make sure we are not out of bounds
2400                 return;
2401         }
2402
2403         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2404                 Transaction *transaction = pendingTransactionQueue->get(i);
2405
2406                 lastPendingTransactionSpeculatedOn = transaction;
2407
2408                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2409                         // Guard evaluated to true so update the speculative table
2410                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2411                         while (kvit->hasNext()) {
2412                                 KeyValue *kv = kvit->next();
2413                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2414                         }
2415                         delete kvit;
2416                 }
2417         }
2418 }
2419
2420 /**
2421  * Set dead and remove from the live transaction tables the
2422  * transactions that are dead
2423  */
2424 void Table::updateLiveTransactionsAndStatus() {
2425         // Go through each of the transactions
2426         {
2427                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2428                 while (iter->hasNext()) {
2429                         int64_t key = iter->next();
2430                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2431
2432                         // Check if the transaction is dead
2433                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2434                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2435                                 // Set dead the transaction
2436                                 transaction->setDead();
2437
2438                                 // Remove the transaction from the live table
2439                                 iter->remove();
2440                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2441                                 delete transaction;
2442                         }
2443                 }
2444                 delete iter;
2445         }
2446
2447         // Go through each of the transactions
2448         {
2449                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2450                 while (iter->hasNext()) {
2451                         int64_t key = iter->next();
2452                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2453
2454                         // Check if the transaction is dead
2455                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2456                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2457                                 // Set committed
2458                                 status->setStatus(TransactionStatus_StatusCommitted);
2459
2460                                 // Remove
2461                                 iter->remove();
2462                         }
2463                 }
2464                 delete iter;
2465         }
2466 }
2467
2468 /**
2469  * Process this slot, entry by entry->  Also update the latest message sent by slot
2470  */
2471 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2472
2473         // Update the last message seen
2474         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2475
2476         // Process each entry in the slot
2477         Vector<Entry *> *entries = slot->getEntries();
2478         uint eSize = entries->size();
2479         for (uint ei = 0; ei < eSize; ei++) {
2480                 Entry *entry = entries->get(ei);
2481                 switch (entry->getType()) {
2482                 case TypeCommitPart:
2483                         processEntry((CommitPart *)entry);
2484                         break;
2485                 case TypeAbort:
2486                         processEntry((Abort *)entry);
2487                         break;
2488                 case TypeTransactionPart:
2489                         processEntry((TransactionPart *)entry);
2490                         break;
2491                 case TypeNewKey:
2492                         processEntry((NewKey *)entry);
2493                         break;
2494                 case TypeLastMessage:
2495                         processEntry((LastMessage *)entry, machineSet);
2496                         break;
2497                 case TypeRejectedMessage:
2498                         processEntry((RejectedMessage *)entry, indexer);
2499                         break;
2500                 case TypeTableStatus:
2501                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2502                         break;
2503                 default:
2504                         throw new Error("Unrecognized type: ");
2505                 }
2506         }
2507 }
2508
2509 /**
2510  * Update the last message that was sent for a machine Id
2511  */
2512 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2513         // Update what the last message received by a machine was
2514         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2515 }
2516
2517 /**
2518  * Add the new key to the arbitrators table and update the set of live
2519  * new keys (in case of a rescued new key message)
2520  */
2521 void Table::processEntry(NewKey *entry) {
2522         // Update the arbitrator table with the new key information
2523         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2524
2525         // Update what the latest live new key is
2526         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2527         if (oldNewKey != NULL) {
2528                 // Delete the old new key messages
2529                 oldNewKey->setDead();
2530         }
2531 }
2532
2533 /**
2534  * Process new table status entries and set dead the old ones as new
2535  * ones come in-> keeps track of the largest and smallest table status
2536  * seen in this current round of updating the local copy of the block
2537  * chain
2538  */
2539 void Table::processEntry(TableStatus *entry, int64_t seq) {
2540         int newNumSlots = entry->getMaxSlots();
2541         updateCurrMaxSize(newNumSlots);
2542         initExpectedSize(seq, newNumSlots);
2543
2544         if (liveTableStatus != NULL) {
2545                 // We have a larger table status so the old table status is no
2546                 // int64_ter alive
2547                 liveTableStatus->setDead();
2548         }
2549
2550         // Make this new table status the latest alive table status
2551         liveTableStatus = entry;
2552 }
2553
2554 /**
2555  * Check old messages to see if there is a block chain violation->
2556  * Also
2557  */
2558 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2559         int64_t oldSeqNum = entry->getOldSeqNum();
2560         int64_t newSeqNum = entry->getNewSeqNum();
2561         bool isequal = entry->getEqual();
2562         int64_t machineId = entry->getMachineID();
2563         int64_t seq = entry->getSequenceNumber();
2564
2565         // Check if we have messages that were supposed to be rejected in
2566         // our local block chain
2567         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2568                 // Get the slot
2569                 Slot *slot = indexer->getSlot(seqNum);
2570
2571                 if (slot != NULL) {
2572                         // If we have this slot make sure that it was not supposed to be
2573                         // a rejected slot
2574                         int64_t slotMachineId = slot->getMachineID();
2575                         if (isequal != (slotMachineId == machineId)) {
2576                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2577                         }
2578                 }
2579         }
2580
2581         // Create a list of clients to watch until they see this rejected
2582         // message entry->
2583         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2584         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2585         while (iter->hasNext()) {
2586                 // Machine ID for the last message entry
2587                 int64_t lastMessageEntryMachineId = iter->next();
2588
2589                 // We've seen it, don't need to continue to watch->  Our next
2590                 // message will implicitly acknowledge it->
2591                 if (lastMessageEntryMachineId == localMachineId) {
2592                         continue;
2593                 }
2594
2595                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2596                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2597
2598                 if (entrySequenceNumber < seq) {
2599                         // Add this rejected message to the set of messages that this
2600                         // machine ID did not see yet
2601                         addWatchVector(lastMessageEntryMachineId, entry);
2602                         // This client did not see this rejected message yet so add it
2603                         // to the watch set to monitor
2604                         deviceWatchSet->add(lastMessageEntryMachineId);
2605                 }
2606         }
2607         delete iter;
2608
2609         if (deviceWatchSet->isEmpty()) {
2610                 // This rejected message has been seen by all the clients so
2611                 entry->setDead();
2612                 delete deviceWatchSet;
2613         } else {
2614                 // We need to watch this rejected message
2615                 entry->setWatchSet(deviceWatchSet);
2616         }
2617 }
2618
2619 /**
2620  * Check if this abort is live, if not then save it so we can kill it
2621  * later-> update the last transaction number that was arbitrated on->
2622  */
2623 void Table::processEntry(Abort *entry) {
2624         if (entry->getTransactionSequenceNumber() != -1) {
2625                 // update the transaction status if it was sent to the server
2626                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2627                 if (status != NULL) {
2628                         status->setStatus(TransactionStatus_StatusAborted);
2629                 }
2630         }
2631
2632         // Abort has not been seen by the client it is for yet so we need to
2633         // keep track of it
2634
2635         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2636         if (previouslySeenAbort != NULL) {
2637                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2638         }
2639
2640         if (entry->getTransactionArbitrator() == localMachineId) {
2641                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2642         }
2643
2644         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2645                 // The machine already saw this so it is dead
2646                 entry->setDead();
2647                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2648                 liveAbortTable->remove(&abortid);
2649
2650                 if (entry->getTransactionArbitrator() == localMachineId) {
2651                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2652                 }
2653                 return;
2654         }
2655
2656         // Update the last arbitration data that we have seen so far
2657         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2658                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2659                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2660                         // Is larger
2661                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2662                 }
2663         } else {
2664                 // Never seen any data from this arbitrator so record the first one
2665                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2666         }
2667
2668         // Set dead a transaction if we can
2669         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2670
2671         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2672         if (transactionToSetDead != NULL) {
2673                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2674         }
2675
2676         // Update the last transaction sequence number that the arbitrator
2677         // arbitrated on
2678         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2679                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2680                 // Is a valid one
2681                 if (entry->getTransactionSequenceNumber() != -1) {
2682                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2683                 }
2684         }
2685 }
2686
2687 /**
2688  * Set dead the transaction part if that transaction is dead and keep
2689  * track of all new parts
2690  */
2691 void Table::processEntry(TransactionPart *entry) {
2692         // Check if we have already seen this transaction and set it dead OR
2693         // if it is not alive
2694         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2695                 // This transaction is dead, it was already committed or aborted
2696                 entry->setDead();
2697                 return;
2698         }
2699
2700         // This part is still alive
2701         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2702
2703         if (transactionPart == NULL) {
2704                 // Dont have a table for this machine Id yet so make one
2705                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2706                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2707         }
2708
2709         // Update the part and set dead ones we have already seen (got a
2710         // rescued version)
2711         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2712         if (previouslySeenPart != NULL) {
2713                 previouslySeenPart->setDead();
2714         }
2715 }
2716
2717 /**
2718  * Process new commit entries and save them for future use->  Delete duplicates
2719  */
2720 void Table::processEntry(CommitPart *entry) {
2721         // Update the last transaction that was updated if we can
2722         if (entry->getTransactionSequenceNumber() != -1) {
2723                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2724                                 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2725                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2726                 }
2727         }
2728
2729         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2730         if (commitPart == NULL) {
2731                 // Don't have a table for this machine Id yet so make one
2732                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2733                 newCommitParts->put(entry->getMachineId(), commitPart);
2734         }
2735         // Update the part and set dead ones we have already seen (got a
2736         // rescued version)
2737         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2738         if (previouslySeenPart != NULL) {
2739                 previouslySeenPart->setDead();
2740         }
2741 }
2742
2743 /**
2744  * Update the last message seen table-> Update and set dead the
2745  * appropriate RejectedMessages as clients see them-> Updates the live
2746  * aborts, removes those that are dead and sets them dead-> Check that
2747  * the last message seen is correct and that there is no mismatch of
2748  * our own last message or that other clients have not had a rollback
2749  * on the last message->
2750  */
2751 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2752         // We have seen this machine ID
2753         machineSet->remove(machineId);
2754
2755         // Get the set of rejected messages that this machine Id is has not seen yet
2756         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2757         // If there is a rejected message that this machine Id has not seen yet
2758         if (watchset != NULL) {
2759                 // Go through each rejected message that this machine Id has not
2760                 // seen yet
2761
2762                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2763                 while (rmit->hasNext()) {
2764                         RejectedMessage *rm = rmit->next();
2765                         // If this machine Id has seen this rejected message->->->
2766                         if (rm->getSequenceNumber() <= seqNum) {
2767                                 // Remove it from our watchlist
2768                                 rmit->remove();
2769                                 // Decrement machines that need to see this notification
2770                                 rm->removeWatcher(machineId);
2771                         }
2772                 }
2773                 delete rmit;
2774         }
2775
2776         // Set dead the abort
2777         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2778
2779         while (abortit->hasNext()) {
2780                 Pair<int64_t, int64_t> *key = abortit->next();
2781                 Abort *abort = liveAbortTable->get(key);
2782                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2783                         abort->setDead();
2784                         abortit->remove();
2785                         if (abort->getTransactionArbitrator() == localMachineId) {
2786                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2787                         }
2788                 }
2789         }
2790         delete abortit;
2791         if (machineId == localMachineId) {
2792                 // Our own messages are immediately dead->
2793                 char livenessType = liveness->getType();
2794                 if (livenessType == TypeLastMessage) {
2795                         ((LastMessage *)liveness)->setDead();
2796                 } else if (livenessType == TypeSlot) {
2797                         ((Slot *)liveness)->setDead();
2798                 } else {
2799                         throw new Error("Unrecognized type");
2800                 }
2801         }
2802         // Get the old last message for this device
2803         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2804         if (lastMessageEntry == NULL) {
2805                 // If no last message then there is nothing else to process
2806                 return;
2807         }
2808
2809         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2810         Liveness *lastEntry = lastMessageEntry->getSecond();
2811         delete lastMessageEntry;
2812
2813         // If it is not our machine Id since we already set ours to dead
2814         if (machineId != localMachineId) {
2815                 char lastEntryType = lastEntry->getType();
2816
2817                 if (lastEntryType == TypeLastMessage) {
2818                         ((LastMessage *)lastEntry)->setDead();
2819                 } else if (lastEntryType == TypeSlot) {
2820                         ((Slot *)lastEntry)->setDead();
2821                 } else {
2822                         throw new Error("Unrecognized type");
2823                 }
2824         }
2825         // Make sure the server is not playing any games
2826         if (machineId == localMachineId) {
2827                 if (hadPartialSendToServer) {
2828                         // We were not making any updates and we had a machine mismatch
2829                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2830                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2831                         }
2832                 } else {
2833                         // We were not making any updates and we had a machine mismatch
2834                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2835                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2836                         }
2837                 }
2838         } else {
2839                 if (lastMessageSeqNum > seqNum) {
2840                         throw new Error("Server Error: Rollback on remote machine sequence number");
2841                 }
2842         }
2843 }
2844
2845 /**
2846  * Add a rejected message entry to the watch set to keep track of
2847  * which clients have seen that rejected message entry and which have
2848  * not.
2849  */
2850 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2851         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2852         if (entries == NULL) {
2853                 // There is no set for this machine ID yet so create one
2854                 entries = new Hashset<RejectedMessage *>();
2855                 rejectedMessageWatchVectorTable->put(machineId, entries);
2856         }
2857         entries->add(entry);
2858 }
2859
2860 /**
2861  * Check if the HMAC chain is not violated
2862  */
2863 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2864         for (uint i = 0; i < newSlots->length(); i++) {
2865                 Slot *currSlot = newSlots->get(i);
2866                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2867                 if (prevSlot != NULL &&
2868                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2869                         throw new Error("Server Error: Invalid HMAC Chain");
2870         }
2871 }