edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastPendingSendArbitrationEntriesToDelete(NULL),
64         lastNewKey(NULL),
65         committedKeyValueTable(NULL),
66         speculatedKeyValueTable(NULL),
67         pendingTransactionSpeculatedKeyValueTable(NULL),
68         liveNewKeyTable(NULL),
69         lastMessageTable(NULL),
70         rejectedMessageWatchVectorTable(NULL),
71         arbitratorTable(NULL),
72         liveAbortTable(NULL),
73         newTransactionParts(NULL),
74         newCommitParts(NULL),
75         lastArbitratedTransactionNumberByArbitratorTable(NULL),
76         liveTransactionBySequenceNumberTable(NULL),
77         liveTransactionByTransactionIdTable(NULL),
78         liveCommitsTable(NULL),
79         liveCommitsByKeyTable(NULL),
80         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81         rejectedSlotVector(NULL),
82         pendingTransactionQueue(NULL),
83         pendingSendArbitrationRounds(NULL),
84         pendingSendArbitrationEntriesToDelete(NULL),
85         transactionPartsSent(NULL),
86         outstandingTransactionStatus(NULL),
87         liveAbortsGeneratedByLocal(NULL),
88         offlineTransactionsCommittedAndAtServer(NULL),
89         localCommunicationTable(NULL),
90         lastTransactionSeenFromMachineFromServer(NULL),
91         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92         lastInsertedNewKey(false),
93         lastSeqNumArbOn(0)
94 {
95         init();
96 }
97
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
99         buffer(NULL),
100         cloud(_cloud),
101         random(NULL),
102         liveTableStatus(NULL),
103         pendingTransactionBuilder(NULL),
104         lastPendingTransactionSpeculatedOn(NULL),
105         firstPendingTransaction(NULL),
106         numberOfSlots(0),
107         bufferResizeThreshold(0),
108         liveSlotCount(0),
109         oldestLiveSlotSequenceNumver(1),
110         localMachineId(_localMachineId),
111         sequenceNumber(0),
112         localSequenceNumber(0),
113         localTransactionSequenceNumber(0),
114         lastTransactionSequenceNumberSpeculatedOn(0),
115         oldestTransactionSequenceNumberSpeculatedOn(0),
116         localArbitrationSequenceNumber(0),
117         hadPartialSendToServer(false),
118         attemptedToSendToServer(false),
119         expectedsize(0),
120         didFindTableStatus(false),
121         currMaxSize(0),
122         lastSlotAttemptedToSend(NULL),
123         lastIsNewKey(false),
124         lastNewSize(0),
125         lastTransactionPartsSent(NULL),
126         lastPendingSendArbitrationEntriesToDelete(NULL),
127         lastNewKey(NULL),
128         committedKeyValueTable(NULL),
129         speculatedKeyValueTable(NULL),
130         pendingTransactionSpeculatedKeyValueTable(NULL),
131         liveNewKeyTable(NULL),
132         lastMessageTable(NULL),
133         rejectedMessageWatchVectorTable(NULL),
134         arbitratorTable(NULL),
135         liveAbortTable(NULL),
136         newTransactionParts(NULL),
137         newCommitParts(NULL),
138         lastArbitratedTransactionNumberByArbitratorTable(NULL),
139         liveTransactionBySequenceNumberTable(NULL),
140         liveTransactionByTransactionIdTable(NULL),
141         liveCommitsTable(NULL),
142         liveCommitsByKeyTable(NULL),
143         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144         rejectedSlotVector(NULL),
145         pendingTransactionQueue(NULL),
146         pendingSendArbitrationRounds(NULL),
147         pendingSendArbitrationEntriesToDelete(NULL),
148         transactionPartsSent(NULL),
149         outstandingTransactionStatus(NULL),
150         liveAbortsGeneratedByLocal(NULL),
151         offlineTransactionsCommittedAndAtServer(NULL),
152         localCommunicationTable(NULL),
153         lastTransactionSeenFromMachineFromServer(NULL),
154         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155         lastInsertedNewKey(false),
156         lastSeqNumArbOn(0)
157 {
158         init();
159 }
160
161 Table::~Table() {
162         delete cloud;
163         delete random;
164         delete buffer;
165         // init data structs
166         delete committedKeyValueTable;
167         delete speculatedKeyValueTable;
168         delete pendingTransactionSpeculatedKeyValueTable;
169         delete liveNewKeyTable;
170         {
171                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
172                 while (lmit->hasNext()) {
173                         Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
174                 }
175                 delete lmit;
176                 delete lastMessageTable;
177         }
178         if (pendingTransactionBuilder != NULL)
179                 delete pendingTransactionBuilder;
180         {
181                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
182                 while(rmit->hasNext()) {
183                         int64_t machineid = rmit->next();
184                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
185                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
186                         while (mit->hasNext()) {
187                                 RejectedMessage * rm = mit->next();
188                                 delete rm;
189                         }
190                         delete mit;
191                         delete rmset;
192                 }
193                 delete rmit;
194                 delete rejectedMessageWatchVectorTable;
195         }
196         delete arbitratorTable;
197         delete liveAbortTable;
198         {
199                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
200                 while (partsit->hasNext()) {
201                         int64_t machineId = partsit->next();
202                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
203                         delete parts;
204                 }
205                 delete partsit;
206                 delete newTransactionParts;
207         }
208         {
209                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
210                 while (partsit->hasNext()) {
211                         int64_t machineId = partsit->next();
212                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
213                         delete parts;
214                 }
215                 delete partsit;
216                 delete newCommitParts;
217         }
218         delete lastArbitratedTransactionNumberByArbitratorTable;
219         delete liveTransactionBySequenceNumberTable;
220         delete liveTransactionByTransactionIdTable;
221         {
222                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
223                 while (liveit->hasNext()) {
224                         int64_t arbitratorId = liveit->next();
225                         
226                         // Get all the commits for a specific arbitrator
227                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
228                         {
229                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
230                                 while (clientit->hasNext()) {
231                                         int64_t id = clientit->next();
232                                         delete commitForClientTable->get(id);
233                                 }
234                                 delete clientit;
235                         }
236                         
237                         delete commitForClientTable;
238                 }
239                 delete liveit;
240                 delete liveCommitsTable;
241         }
242         delete liveCommitsByKeyTable;
243         delete lastCommitSeenSequenceNumberByArbitratorTable;
244         delete rejectedSlotVector;
245         {
246                 uint size = pendingTransactionQueue->size();
247                 for (uint iter = 0; iter < size; iter++) {
248                         delete pendingTransactionQueue->get(iter);
249                 }
250                 delete pendingTransactionQueue;
251         }
252         delete pendingSendArbitrationEntriesToDelete;
253         delete transactionPartsSent;
254         delete outstandingTransactionStatus;
255         delete liveAbortsGeneratedByLocal;
256         delete offlineTransactionsCommittedAndAtServer;
257         delete localCommunicationTable;
258         delete lastTransactionSeenFromMachineFromServer;
259         {
260                 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
261                         delete pendingSendArbitrationRounds->get(i);
262                 }
263                 delete pendingSendArbitrationRounds;
264         }
265         if (lastTransactionPartsSent != NULL)
266                 delete lastTransactionPartsSent;
267         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
268 }
269
270 /**
271  * Init all the stuff needed for for table usage
272  */
273 void Table::init() {
274         // Init helper objects
275         random = new SecureRandom();
276         buffer = new SlotBuffer();
277
278         // init data structs
279         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
280         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
281         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
282         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
283         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
284         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
285         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
286         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
287         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
288         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
289         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
290         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
291         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
292         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
293         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
294         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
295         rejectedSlotVector = new Vector<int64_t>();
296         pendingTransactionQueue = new Vector<Transaction *>();
297         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
298         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
299         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
300         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
301         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
302         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
303         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
304         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
305         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
306
307         // Other init stuff
308         numberOfSlots = buffer->capacity();
309         setResizeThreshold();
310 }
311
312 /**
313  * Initialize the table by inserting a table status as the first entry
314  * into the table status also initialize the crypto stuff.
315  */
316 void Table::initTable() {
317         cloud->initSecurity();
318
319         // Create the first insertion into the block chain which is the table status
320         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
321         localSequenceNumber++;
322         TableStatus *status = new TableStatus(s, numberOfSlots);
323         s->addEntry(status);
324         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
325
326         if (array == NULL) {
327                 array = new Array<Slot *>(1);
328                 array->set(0, s);
329                 // update local block chain
330                 validateAndUpdate(array, true);
331                 delete array;
332         } else if (array->length() == 1) {
333                 // in case we did push the slot BUT we failed to init it
334                 validateAndUpdate(array, true);
335                 delete array;
336         } else {
337                 delete array;
338                 throw new Error("Error on initialization");
339         }
340 }
341
342 /**
343  * Rebuild the table from scratch by pulling the latest block chain
344  * from the server.
345  */
346 void Table::rebuild() {
347         // Just pull the latest slots from the server
348         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
349         validateAndUpdate(newslots, true);
350         delete newslots;
351         sendToServer(NULL);
352         updateLiveTransactionsAndStatus();
353 }
354
355 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
356         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
357 }
358
359 int64_t Table::getArbitrator(IoTString *key) {
360         return arbitratorTable->get(key);
361 }
362
363 void Table::close() {
364         cloud->closeCloud();
365 }
366
367 IoTString *Table::getCommitted(IoTString *key)  {
368         KeyValue *kv = committedKeyValueTable->get(key);
369
370         if (kv != NULL) {
371                 return new IoTString(kv->getValue());
372         } else {
373                 return NULL;
374         }
375 }
376
377 IoTString *Table::getSpeculative(IoTString *key) {
378         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
379
380         if (kv == NULL) {
381                 kv = speculatedKeyValueTable->get(key);
382         }
383
384         if (kv == NULL) {
385                 kv = committedKeyValueTable->get(key);
386         }
387
388         if (kv != NULL) {
389                 return new IoTString(kv->getValue());
390         } else {
391                 return NULL;
392         }
393 }
394
395 IoTString *Table::getCommittedAtomic(IoTString *key) {
396         KeyValue *kv = committedKeyValueTable->get(key);
397
398         if (!arbitratorTable->contains(key)) {
399                 throw new Error("Key not Found.");
400         }
401
402         // Make sure new key value pair matches the current arbitrator
403         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
404                 // TODO: Maybe not throw en error
405                 throw new Error("Not all Key Values Match Arbitrator.");
406         }
407
408         if (kv != NULL) {
409                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
410                 return new IoTString(kv->getValue());
411         } else {
412                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
413                 return NULL;
414         }
415 }
416
417 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
418         if (!arbitratorTable->contains(key)) {
419                 throw new Error("Key not Found.");
420         }
421
422         // Make sure new key value pair matches the current arbitrator
423         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
424                 // TODO: Maybe not throw en error
425                 throw new Error("Not all Key Values Match Arbitrator.");
426         }
427
428         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
429
430         if (kv == NULL) {
431                 kv = speculatedKeyValueTable->get(key);
432         }
433
434         if (kv == NULL) {
435                 kv = committedKeyValueTable->get(key);
436         }
437
438         if (kv != NULL) {
439                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
440                 return new IoTString(kv->getValue());
441         } else {
442                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
443                 return NULL;
444         }
445 }
446
447 bool Table::update()  {
448         try {
449                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
450                 validateAndUpdate(newSlots, false);
451                 delete newSlots;
452                 sendToServer(NULL);
453                 updateLiveTransactionsAndStatus();
454                 return true;
455         } catch (Exception *e) {
456                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
457                 while (kit->hasNext()) {
458                         int64_t m = kit->next();
459                         updateFromLocal(m);
460                 }
461                 delete kit;
462         }
463
464         return false;
465 }
466
467 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
468         while (true) {
469                 if (arbitratorTable->contains(keyName)) {
470                         // There is already an arbitrator
471                         return false;
472                 }
473                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
474
475                 if (sendToServer(newKey)) {
476                         // If successfully inserted
477                         return true;
478                 }
479         }
480 }
481
482 void Table::startTransaction() {
483         // Create a new transaction, invalidates any old pending transactions.
484         if (pendingTransactionBuilder != NULL)
485                 delete pendingTransactionBuilder;
486         pendingTransactionBuilder = new PendingTransaction(localMachineId);
487 }
488
489 void Table::put(IoTString *key, IoTString *value) {
490         // Make sure it is a valid key
491         if (!arbitratorTable->contains(key)) {
492                 throw new Error("Key not Found.");
493         }
494
495         // Make sure new key value pair matches the current arbitrator
496         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
497                 // TODO: Maybe not throw en error
498                 throw new Error("Not all Key Values Match Arbitrator.");
499         }
500
501         // Add the key value to this transaction
502         KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
503         pendingTransactionBuilder->addKV(kv);
504 }
505
506 TransactionStatus *Table::commitTransaction() {
507         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
508                 // transaction with no updates will have no effect on the system
509                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
510         }
511
512         // Set the local transaction sequence number and increment
513         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
514         localTransactionSequenceNumber++;
515
516         // Create the transaction status
517         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
518
519         // Create the new transaction
520         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
521         newTransaction->setTransactionStatus(transactionStatus);
522
523         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
524                 // Add it to the queue and invalidate the builder for safety
525                 pendingTransactionQueue->add(newTransaction);
526         } else {
527                 arbitrateOnLocalTransaction(newTransaction);
528                 delete newTransaction;
529                 updateLiveStateFromLocal();
530         }
531         if (pendingTransactionBuilder != NULL)
532                 delete pendingTransactionBuilder;
533         
534         pendingTransactionBuilder = new PendingTransaction(localMachineId);
535
536         try {
537                 sendToServer(NULL);
538         } catch (ServerException *e) {
539
540                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
541                 uint size = pendingTransactionQueue->size();
542                 uint oldindex = 0;
543                 for (uint iter = 0; iter < size; iter++) {
544                         Transaction *transaction = pendingTransactionQueue->get(iter);
545                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
546
547                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
548                                 // Already contacted this client so ignore all attempts to contact this client
549                                 // to preserve ordering for arbitrator
550                                 continue;
551                         }
552
553                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
554
555                         if (sendReturn.getFirst()) {
556                                 // Failed to contact over local
557                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
558                         } else {
559                                 // Successful contact or should not contact
560
561                                 if (sendReturn.getSecond()) {
562                                         // did arbitrate
563                                         delete transaction;
564                                         oldindex--;
565                                 }
566                         }
567                 }
568                 pendingTransactionQueue->setSize(oldindex);
569         }
570
571         updateLiveStateFromLocal();
572
573         return transactionStatus;
574 }
575
576 /**
577  * Recalculate the new resize threshold
578  */
579 void Table::setResizeThreshold() {
580         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
581         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
582 }
583
584 int64_t Table::getLocalSequenceNumber() {
585         return localSequenceNumber;
586 }
587
588 NewKey * Table::handlePartialSend(NewKey * newKey) {
589         //Didn't receive acknowledgement for last send
590         //See if the server has received a newer slot
591         
592         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
593         if (newSlots->length() == 0) {
594                 //Retry sending old slot
595                 bool wasInserted = false;
596                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
597                 
598                 if (sendSlotsReturn) {
599                         if (newKey != NULL) {
600                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
601                                         newKey = NULL;
602                                 }
603                         }
604                         
605                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
606                         while (trit->hasNext()) {
607                                 Transaction *transaction = trit->next();
608                                 transaction->resetServerFailure();
609                                 // Update which transactions parts still need to be sent
610                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
611                                 // Add the transaction status to the outstanding list
612                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
613                                 
614                                 // Update the transaction status
615                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
616                                 
617                                 // Check if all the transaction parts were successfully
618                                 // sent and if so then remove it from pending
619                                 if (transaction->didSendAllParts()) {
620                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
621                                         pendingTransactionQueue->remove(transaction);
622                                 }
623                         }
624                         delete trit;
625                 } else {
626                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
627                                 if (newKey != NULL) {
628                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
629                                                 newKey = NULL;
630                                         }
631                                 }
632                                 
633                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
634                                 while (trit->hasNext()) {
635                                         Transaction *transaction = trit->next();
636                                         transaction->resetServerFailure();
637                                         
638                                         // Update which transactions parts still need to be sent
639                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
640                                         
641                                         // Add the transaction status to the outstanding list
642                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
643                                         
644                                         // Update the transaction status
645                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
646                                         
647                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
648                                         if (transaction->didSendAllParts()) {
649                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
650                                                 pendingTransactionQueue->remove(transaction);
651                                         } else {
652                                                 transaction->resetServerFailure();
653                                                 // Set the transaction sequence number back to nothing
654                                                 if (!transaction->didSendAPartToServer()) {
655                                                         transaction->setSequenceNumber(-1);
656                                                 }
657                                         }
658                                 }
659                                 delete trit;
660                         }
661                 }
662                 
663                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664                 while (trit->hasNext()) {
665                         Transaction *transaction = trit->next();
666                         transaction->resetServerFailure();
667                         // Set the transaction sequence number back to nothing
668                         if (!transaction->didSendAPartToServer()) {
669                                 transaction->setSequenceNumber(-1);
670                         }
671                 }
672                 delete trit;
673                 
674                 if (newSlots->length() != 0) {
675                         // insert into the local block chain
676                         validateAndUpdate(newSlots, true);
677                 }
678         } else {
679                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
680                         if (newKey != NULL) {
681                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
682                                         newKey = NULL;
683                                 }
684                         }
685                         
686                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
687                         while (trit->hasNext()) {
688                                 Transaction *transaction = trit->next();
689                                 transaction->resetServerFailure();
690                                 
691                                 // Update which transactions parts still need to be sent
692                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
693                                 
694                                 // Add the transaction status to the outstanding list
695                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
696                                 
697                                 // Update the transaction status
698                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
699                                 
700                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
701                                 if (transaction->didSendAllParts()) {
702                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
703                                         pendingTransactionQueue->remove(transaction);
704                                 } else {
705                                         transaction->resetServerFailure();
706                                         // Set the transaction sequence number back to nothing
707                                         if (!transaction->didSendAPartToServer()) {
708                                                 transaction->setSequenceNumber(-1);
709                                         }
710                                 }
711                         }
712                         delete trit;
713                 } else {
714                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
715                         while (trit->hasNext()) {
716                                 Transaction *transaction = trit->next();
717                                 transaction->resetServerFailure();
718                                 // Set the transaction sequence number back to nothing
719                                 if (!transaction->didSendAPartToServer()) {
720                                         transaction->setSequenceNumber(-1);
721                                 }
722                         }
723                         delete trit;
724                 }
725                 
726                 // insert into the local block chain
727                 validateAndUpdate(newSlots, true);
728         }
729         delete newSlots;
730         return newKey;
731 }
732
733 bool Table::sendToServer(NewKey *newKey) {
734         if (hadPartialSendToServer) {
735                 newKey = handlePartialSend(newKey);
736         }
737
738         try {
739                 // While we have stuff that needs inserting into the block chain
740                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
741                         if (hadPartialSendToServer) {
742                                 throw new Error("Should Be error free");
743                         }
744                         
745                         // If there is a new key with same name then end
746                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
747                                 return false;
748                         }
749
750                         // Create the slot
751                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
752                         localSequenceNumber++;
753
754                         // Try to fill the slot with data
755                         int newSize = 0;
756                         bool insertedNewKey = false;
757                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
758
759                         if (needsResize) {
760                                 // Reset which transaction to send
761                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
762                                 while (trit->hasNext()) {
763                                         Transaction *transaction = trit->next();
764                                         transaction->resetNextPartToSend();
765
766                                         // Set the transaction sequence number back to nothing
767                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
768                                                 transaction->setSequenceNumber(-1);
769                                         }
770                                 }
771                                 delete trit;
772
773                                 // Clear the sent data since we are trying again
774                                 pendingSendArbitrationEntriesToDelete->clear();
775                                 transactionPartsSent->clear();
776
777                                 // We needed a resize so try again
778                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
779                         }
780
781                         lastSlotAttemptedToSend = slot;
782                         lastIsNewKey = (newKey != NULL);
783                         lastInsertedNewKey = insertedNewKey;
784                         lastNewSize = newSize;
785                         lastNewKey = newKey;
786                         if (lastTransactionPartsSent != NULL)
787                                 delete lastTransactionPartsSent;
788                         lastTransactionPartsSent = transactionPartsSent->clone();
789                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
790
791                         Array<Slot *> * newSlots = NULL;
792                         bool wasInserted = false;
793                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
794
795                         if (sendSlotsReturn) {
796                                 // Did insert into the block chain
797                                 if (insertedNewKey) {
798                                         // This slot was what was inserted not a previous slot
799                                         // New Key was successfully inserted into the block chain so dont want to insert it again
800                                         newKey = NULL;
801                                 }
802
803                                 // Remove the aborts and commit parts that were sent from the pending to send queue
804                                 uint size = pendingSendArbitrationRounds->size();
805                                 uint oldcount = 0;
806                                 for (uint i = 0; i < size; i++) {
807                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
808                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
809
810                                         if (!round->isDoneSending()) {
811                                                 //Add part back in
812                                                 pendingSendArbitrationRounds->set(oldcount++,
813                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
814                                         } else
815                                                 delete pendingSendArbitrationRounds->get(i);
816                                 }
817                                 pendingSendArbitrationRounds->setSize(oldcount);
818
819                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
820                                 while (trit->hasNext()) {
821                                         Transaction *transaction = trit->next();
822                                         transaction->resetServerFailure();
823
824                                         // Update which transactions parts still need to be sent
825                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
826
827                                         // Add the transaction status to the outstanding list
828                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
829
830                                         // Update the transaction status
831                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
832
833                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
834                                         if (transaction->didSendAllParts()) {
835                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
836                                                 pendingTransactionQueue->remove(transaction);
837                                         }
838                                 }
839                                 delete trit;
840                         } else {
841                                 // Reset which transaction to send
842                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
843                                 while (trit->hasNext()) {
844                                         Transaction *transaction = trit->next();
845                                         transaction->resetNextPartToSend();
846
847                                         // Set the transaction sequence number back to nothing
848                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
849                                                 transaction->setSequenceNumber(-1);
850                                         }
851                                 }
852                                 delete trit;
853                         }
854
855                         // Clear the sent data in preparation for next send
856                         pendingSendArbitrationEntriesToDelete->clear();
857                         transactionPartsSent->clear();
858
859                         if (newSlots->length() != 0) {
860                                 // insert into the local block chain
861                                 validateAndUpdate(newSlots, true);
862                         }
863                         delete newSlots;
864                 }
865         } catch (ServerException *e) {
866                 if (e->getType() != ServerException_TypeInputTimeout) {
867                         // Nothing was able to be sent to the server so just clear these data structures
868                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
869                         while (trit->hasNext()) {
870                                 Transaction *transaction = trit->next();
871                                 transaction->resetNextPartToSend();
872
873                                 // Set the transaction sequence number back to nothing
874                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
875                                         transaction->setSequenceNumber(-1);
876                                 }
877                         }
878                         delete trit;
879                 } else {
880                         // There was a partial send to the server
881                         hadPartialSendToServer = true;
882
883                         // Nothing was able to be sent to the server so just clear these data structures
884                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
885                         while (trit->hasNext()) {
886                                 Transaction *transaction = trit->next();
887                                 transaction->resetNextPartToSend();
888                                 transaction->setServerFailure();
889                         }
890                         delete trit;
891                 }
892
893                 pendingSendArbitrationEntriesToDelete->clear();
894                 transactionPartsSent->clear();
895
896                 throw e;
897         }
898
899         return newKey == NULL;
900 }
901
902 bool Table::updateFromLocal(int64_t machineId) {
903         if (!localCommunicationTable->contains(machineId))
904                 return false;
905
906         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
907
908         // Get the size of the send data
909         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
910
911         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
912         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
913                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
914         }
915
916         Array<char> *sendData = new Array<char>(sendDataSize);
917         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
918
919         // Encode the data
920         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
921         bbEncode->putInt(0);
922
923         // Send by local
924         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
925         localSequenceNumber++;
926
927         if (returnData == NULL) {
928                 // Could not contact server
929                 return false;
930         }
931
932         // Decode the data
933         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
934         int numberOfEntries = bbDecode->getInt();
935
936         for (int i = 0; i < numberOfEntries; i++) {
937                 char type = bbDecode->get();
938                 if (type == TypeAbort) {
939                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
940                         processEntry(abort);
941                 } else if (type == TypeCommitPart) {
942                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
943                         processEntry(commitPart);
944                 }
945         }
946
947         updateLiveStateFromLocal();
948
949         return true;
950 }
951
952 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
953
954         // Get the devices local communications
955         if (!localCommunicationTable->contains(transaction->getArbitrator()))
956                 return Pair<bool, bool>(true, false);
957
958         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
959
960         // Get the size of the send data
961         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
962         {
963                 Vector<TransactionPart *> *tParts = transaction->getParts();
964                 uint tPartsSize = tParts->size();
965                 for (uint i = 0; i < tPartsSize; i++) {
966                         TransactionPart *part = tParts->get(i);
967                         sendDataSize += part->getSize();
968                 }
969         }
970
971         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
972         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
973                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
974         }
975
976         // Make the send data size
977         Array<char> *sendData = new Array<char>(sendDataSize);
978         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
979
980         // Encode the data
981         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
982         bbEncode->putInt(transaction->getParts()->size());
983         {
984                 Vector<TransactionPart *> *tParts = transaction->getParts();
985                 uint tPartsSize = tParts->size();
986                 for (uint i = 0; i < tPartsSize; i++) {
987                         TransactionPart *part = tParts->get(i);
988                         part->encode(bbEncode);
989                 }
990         }
991
992         // Send by local
993         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
994         localSequenceNumber++;
995
996         if (returnData == NULL) {
997                 // Could not contact server
998                 return Pair<bool, bool>(true, false);
999         }
1000
1001         // Decode the data
1002         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
1003         bool didCommit = bbDecode->get() == 1;
1004         bool couldArbitrate = bbDecode->get() == 1;
1005         int numberOfEntries = bbDecode->getInt();
1006         bool foundAbort = false;
1007
1008         for (int i = 0; i < numberOfEntries; i++) {
1009                 char type = bbDecode->get();
1010                 if (type == TypeAbort) {
1011                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
1012
1013                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
1014                                 foundAbort = true;
1015                         }
1016
1017                         processEntry(abort);
1018                 } else if (type == TypeCommitPart) {
1019                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1020                         processEntry(commitPart);
1021                 }
1022         }
1023
1024         updateLiveStateFromLocal();
1025
1026         if (couldArbitrate) {
1027                 TransactionStatus *status =  transaction->getTransactionStatus();
1028                 if (didCommit) {
1029                         status->setStatus(TransactionStatus_StatusCommitted);
1030                 } else {
1031                         status->setStatus(TransactionStatus_StatusAborted);
1032                 }
1033         } else {
1034                 TransactionStatus *status =  transaction->getTransactionStatus();
1035                 if (foundAbort) {
1036                         status->setStatus(TransactionStatus_StatusAborted);
1037                 } else {
1038                         status->setStatus(TransactionStatus_StatusCommitted);
1039                 }
1040         }
1041
1042         return Pair<bool, bool>(false, true);
1043 }
1044
1045 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1046         // Decode the data
1047         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1048         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1049         int numberOfParts = bbDecode->getInt();
1050
1051         // If we did commit a transaction or not
1052         bool didCommit = false;
1053         bool couldArbitrate = false;
1054
1055         if (numberOfParts != 0) {
1056
1057                 // decode the transaction
1058                 Transaction *transaction = new Transaction();
1059                 for (int i = 0; i < numberOfParts; i++) {
1060                         bbDecode->get();
1061                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1062                         transaction->addPartDecode(newPart);
1063                 }
1064
1065                 // Arbitrate on transaction and pull relevant return data
1066                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1067                 couldArbitrate = localArbitrateReturn.getFirst();
1068                 didCommit = localArbitrateReturn.getSecond();
1069
1070                 updateLiveStateFromLocal();
1071
1072                 // Transaction was sent to the server so keep track of it to prevent double commit
1073                 if (transaction->getSequenceNumber() != -1) {
1074                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1075                 }
1076         }
1077
1078         // The data to send back
1079         int returnDataSize = 0;
1080         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1081
1082         // Get the aborts to send back
1083         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1084         {
1085                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1086                 while (abortit->hasNext())
1087                         abortLocalSequenceNumbers->add(abortit->next());
1088                 delete abortit;
1089         }
1090
1091         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1092
1093         uint asize = abortLocalSequenceNumbers->size();
1094         for (uint i = 0; i < asize; i++) {
1095                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1096                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1097                         continue;
1098                 }
1099
1100                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1101                 unseenArbitrations->add(abort);
1102                 returnDataSize += abort->getSize();
1103         }
1104
1105         // Get the commits to send back
1106         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1107         if (commitForClientTable != NULL) {
1108                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1109                 {
1110                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1111                         while (commitit->hasNext())
1112                                 commitLocalSequenceNumbers->add(commitit->next());
1113                         delete commitit;
1114                 }
1115                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1116
1117                 uint clsSize = commitLocalSequenceNumbers->size();
1118                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1119                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1120                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1121
1122                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1123                                 continue;
1124                         }
1125
1126                         {
1127                                 Vector<CommitPart *> *parts = commit->getParts();
1128                                 uint nParts = parts->size();
1129                                 for (uint i = 0; i < nParts; i++) {
1130                                         CommitPart *commitPart = parts->get(i);
1131                                         unseenArbitrations->add(commitPart);
1132                                         returnDataSize += commitPart->getSize();
1133                                 }
1134                         }
1135                 }
1136         }
1137
1138         // Number of arbitration entries to decode
1139         returnDataSize += 2 * sizeof(int32_t);
1140
1141         // bool of did commit or not
1142         if (numberOfParts != 0) {
1143                 returnDataSize += sizeof(char);
1144         }
1145
1146         // Data to send Back
1147         Array<char> *returnData = new Array<char>(returnDataSize);
1148         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1149
1150         if (numberOfParts != 0) {
1151                 if (didCommit) {
1152                         bbEncode->put((char)1);
1153                 } else {
1154                         bbEncode->put((char)0);
1155                 }
1156                 if (couldArbitrate) {
1157                         bbEncode->put((char)1);
1158                 } else {
1159                         bbEncode->put((char)0);
1160                 }
1161         }
1162
1163         bbEncode->putInt(unseenArbitrations->size());
1164         uint size = unseenArbitrations->size();
1165         for (uint i = 0; i < size; i++) {
1166                 Entry *entry = unseenArbitrations->get(i);
1167                 entry->encode(bbEncode);
1168         }
1169
1170         localSequenceNumber++;
1171         return returnData;
1172 }
1173
1174 /** Checks whether a given slot was sent using new slots in
1175                 array. Returns true if sent and false otherwise.  */
1176
1177 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1178         uint size = array->length();
1179         for (uint i = 0; i < size; i++) {
1180                 Slot *s = array->get(i);
1181                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1182                         return true;
1183                 }
1184         }
1185         
1186         //Also need to see if other machines acknowledged our message
1187         for (uint i = 0; i < size; i++) {
1188                 Slot *s = array->get(i);
1189                 
1190                 // Process each entry in the slot
1191                 Vector<Entry *> *entries = s->getEntries();
1192                 uint eSize = entries->size();
1193                 for (uint ei = 0; ei < eSize; ei++) {
1194                         Entry *entry = entries->get(ei);
1195                         
1196                         if (entry->getType() == TypeLastMessage) {
1197                                 LastMessage *lastMessage = (LastMessage *)entry;
1198                                 
1199                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1200                                         return true;
1201                                 }
1202                         }
1203                 }
1204         }
1205         //Not found
1206         return false;
1207 }
1208
1209 /** Method tries to send slot to server.  Returns status in tuple.
1210                 isInserted returns whether last un-acked send (if any) was
1211                 successful.  Returns whether send was confirmed.x
1212  */
1213
1214 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1215         attemptedToSendToServer = true;
1216
1217         *array = cloud->putSlot(slot, newSize);
1218         if (*array == NULL) {
1219                 *array = new Array<Slot *>(1);
1220                 (*array)->set(0, slot);
1221                 rejectedSlotVector->clear();
1222                 *isInserted = false;
1223                 return true;
1224         } else {
1225                 if ((*array)->length() == 0) {
1226                         throw new Error("Server Error: Did not send any slots");
1227                 }
1228
1229                 if (hadPartialSendToServer) {
1230                         *isInserted = checkSend(*array, slot);
1231
1232                         if (!(*isInserted)) {
1233                                 rejectedSlotVector->add(slot->getSequenceNumber());
1234                         }
1235                         
1236                         return false;
1237                 } else {
1238                         rejectedSlotVector->add(slot->getSequenceNumber());
1239                         *isInserted = false;
1240                         return false;
1241                 }
1242         }
1243 }
1244
1245 /**
1246  * Returns true if a resize was needed but not done.
1247  */
1248 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1249         newSize = 0;//special value to indicate no resize
1250         if (liveSlotCount > bufferResizeThreshold) {
1251                 resize = true;//Resize is forced
1252         }
1253
1254         if (resize) {
1255                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1256                 TableStatus *status = new TableStatus(slot, newSize);
1257                 slot->addEntry(status);
1258         }
1259
1260         // Fill with rejected slots first before doing anything else
1261         doRejectedMessages(slot);
1262
1263         // Do mandatory rescue of entries
1264         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1265
1266         // Extract working variables
1267         bool needsResize = mandatoryRescueReturn.getFirst();
1268         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1269         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1270
1271         if (needsResize && !resize) {
1272                 // We need to resize but we are not resizing so return true to force on retry
1273                 return true;
1274         }
1275
1276         insertedKey = false;
1277         if (newKeyEntry != NULL) {
1278                 newKeyEntry->setSlot(slot);
1279                 if (slot->hasSpace(newKeyEntry)) {
1280                         slot->addEntry(newKeyEntry);
1281                         insertedKey = true;
1282                 }
1283         }
1284
1285         // Clear the transactions, aborts and commits that were sent previously
1286         transactionPartsSent->clear();
1287         pendingSendArbitrationEntriesToDelete->clear();
1288         uint size = pendingSendArbitrationRounds->size();
1289         for (uint i = 0; i < size; i++) {
1290                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1291                 bool isFull = false;
1292                 round->generateParts();
1293                 Vector<Entry *> *parts = round->getParts();
1294
1295                 // Insert pending arbitration data
1296                 uint vsize = parts->size();
1297                 for (uint vi = 0; vi < vsize; vi++) {
1298                         Entry *arbitrationData = parts->get(vi);
1299
1300                         // If it is an abort then we need to set some information
1301                         if (arbitrationData->getType() == TypeAbort) {
1302                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1303                         }
1304
1305                         if (!slot->hasSpace(arbitrationData)) {
1306                                 // No space so cant do anything else with these data entries
1307                                 isFull = true;
1308                                 break;
1309                         }
1310
1311                         // Add to this current slot and add it to entries to delete
1312                         slot->addEntry(arbitrationData);
1313                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1314                 }
1315
1316                 if (isFull) {
1317                         break;
1318                 }
1319         }
1320
1321         if (pendingTransactionQueue->size() > 0) {
1322                 Transaction *transaction = pendingTransactionQueue->get(0);
1323                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1324                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1325                         transaction->setSequenceNumber(slot->getSequenceNumber());
1326                 }
1327
1328                 while (true) {
1329                         TransactionPart *part = transaction->getNextPartToSend();
1330                         if (part == NULL) {
1331                                 // Ran out of parts to send for this transaction so move on
1332                                 break;
1333                         }
1334
1335                         if (slot->hasSpace(part)) {
1336                                 slot->addEntry(part);
1337                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1338                                 if (partsSent == NULL) {
1339                                         partsSent = new Vector<int32_t>();
1340                                         transactionPartsSent->put(transaction, partsSent);
1341                                 }
1342                                 partsSent->add(part->getPartNumber());
1343                                 transactionPartsSent->put(transaction, partsSent);
1344                         } else {
1345                                 break;
1346                         }
1347                 }
1348         }
1349
1350         // Fill the remainder of the slot with rescue data
1351         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1352
1353         return false;
1354 }
1355
1356 void Table::doRejectedMessages(Slot *s) {
1357         if (!rejectedSlotVector->isEmpty()) {
1358                 /* TODO: We should avoid generating a rejected message entry if
1359                  * there is already a sufficient entry in the queue (e->g->,
1360                  * equalsto value of true and same sequence number)->  */
1361
1362                 int64_t old_seqn = rejectedSlotVector->get(0);
1363                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1364                         int64_t new_seqn = rejectedSlotVector->lastElement();
1365                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1366                         s->addEntry(rm);
1367                 } else {
1368                         int64_t prev_seqn = -1;
1369                         uint i = 0;
1370                         /* Go through list of missing messages */
1371                         for (; i < rejectedSlotVector->size(); i++) {
1372                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1373                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1374                                 if (s_msg != NULL)
1375                                         break;
1376                                 prev_seqn = curr_seqn;
1377                         }
1378                         /* Generate rejected message entry for missing messages */
1379                         if (prev_seqn != -1) {
1380                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1381                                 s->addEntry(rm);
1382                         }
1383                         /* Generate rejected message entries for present messages */
1384                         for (; i < rejectedSlotVector->size(); i++) {
1385                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1386                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1387                                 int64_t machineid = s_msg->getMachineID();
1388                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1389                                 s->addEntry(rm);
1390                         }
1391                 }
1392         }
1393 }
1394
1395 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1396         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1397         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1398         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1399                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1400         }
1401
1402         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1403         bool seenLiveSlot = false;
1404         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1405         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1406
1407
1408         // Mandatory Rescue
1409         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1410                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1411                 // Push slot number forward
1412                 if (!seenLiveSlot) {
1413                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1414                 }
1415
1416                 if (!previousSlot->isLive()) {
1417                         continue;
1418                 }
1419
1420                 // We have seen a live slot
1421                 seenLiveSlot = true;
1422
1423                 // Get all the live entries for a slot
1424                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1425
1426                 // Iterate over all the live entries and try to rescue them
1427                 uint lESize = liveEntries->size();
1428                 for (uint i = 0; i < lESize; i++) {
1429                         Entry *liveEntry = liveEntries->get(i);
1430                         if (slot->hasSpace(liveEntry)) {
1431                                 // Enough space to rescue the entry
1432                                 slot->addEntry(liveEntry);
1433                         } else if (currentSequenceNumber == firstIfFull) {
1434                                 //if there's no space but the entry is about to fall off the queue
1435                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1436                         }
1437                 }
1438         }
1439
1440         // Did not resize
1441         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1442 }
1443
1444 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1445         /* now go through live entries from least to greatest sequence number until
1446          * either all live slots added, or the slot doesn't have enough room
1447          * for SKIP_THRESHOLD consecutive entries*/
1448         int skipcount = 0;
1449         int64_t newestseqnum = buffer->getNewestSeqNum();
1450         for (; seqn <= newestseqnum; seqn++) {
1451                 Slot *prevslot = buffer->getSlot(seqn);
1452                 //Push slot number forward
1453                 if (!seenliveslot)
1454                         oldestLiveSlotSequenceNumver = seqn;
1455
1456                 if (!prevslot->isLive())
1457                         continue;
1458                 seenliveslot = true;
1459                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1460                 uint lESize = liveentries->size();
1461                 for (uint i = 0; i < lESize; i++) {
1462                         Entry *liveentry = liveentries->get(i);
1463                         if (s->hasSpace(liveentry))
1464                                 s->addEntry(liveentry);
1465                         else {
1466                                 skipcount++;
1467                                 if (skipcount > Table_SKIP_THRESHOLD) {
1468                                         delete liveentries;
1469                                         goto donesearch;
1470                                 }
1471                         }
1472                 }
1473                 delete liveentries;
1474         }
1475 donesearch:
1476         ;
1477 }
1478
1479 /**
1480  * Checks for malicious activity and updates the local copy of the block chain->
1481  */
1482 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1483         // The cloud communication layer has checked slot HMACs already
1484         // before decoding
1485         if (newSlots->length() == 0) {
1486                 return;
1487         }
1488
1489         // Make sure all slots are newer than the last largest slot this
1490         // client has seen
1491         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1492         if (firstSeqNum <= sequenceNumber) {
1493                 throw new Error("Server Error: Sent older slots!");
1494         }
1495
1496         // Create an object that can access both new slots and slots in our
1497         // local chain without committing slots to our local chain
1498         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1499
1500         // Check that the HMAC chain is not broken
1501         checkHMACChain(indexer, newSlots);
1502
1503         // Set to keep track of messages from clients
1504         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1505         {
1506                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1507                 while (lmit->hasNext())
1508                         machineSet->add(lmit->next());
1509                 delete lmit;
1510         }
1511
1512         // Process each slots data
1513         {
1514                 uint numSlots = newSlots->length();
1515                 for (uint i = 0; i < numSlots; i++) {
1516                         Slot *slot = newSlots->get(i);
1517                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1518                         updateExpectedSize();
1519                 }
1520         }
1521         delete indexer;
1522         
1523         // If there is a gap, check to see if the server sent us
1524         // everything->
1525         if (firstSeqNum != (sequenceNumber + 1)) {
1526
1527                 // Check the size of the slots that were sent down by the server->
1528                 // Can only check the size if there was a gap
1529                 checkNumSlots(newSlots->length());
1530
1531                 // Since there was a gap every machine must have pushed a slot or
1532                 // must have a last message message-> If not then the server is
1533                 // hiding slots
1534                 if (!machineSet->isEmpty()) {
1535                         delete machineSet;
1536                         throw new Error("Missing record for machines: ");
1537                 }
1538         }
1539         delete machineSet;
1540         // Update the size of our local block chain->
1541         commitNewMaxSize();
1542
1543         // Commit new to slots to the local block chain->
1544         {
1545                 uint numSlots = newSlots->length();
1546                 for (uint i = 0; i < numSlots; i++) {
1547                         Slot *slot = newSlots->get(i);
1548
1549                         // Insert this slot into our local block chain copy->
1550                         buffer->putSlot(slot);
1551
1552                         // Keep track of how many slots are currently live (have live data
1553                         // in them)->
1554                         liveSlotCount++;
1555                 }
1556         }
1557         // Get the sequence number of the latest slot in the system
1558         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1559         updateLiveStateFromServer();
1560
1561         // No Need to remember after we pulled from the server
1562         offlineTransactionsCommittedAndAtServer->clear();
1563
1564         // This is invalidated now
1565         hadPartialSendToServer = false;
1566 }
1567
1568 void Table::updateLiveStateFromServer() {
1569         // Process the new transaction parts
1570         processNewTransactionParts();
1571
1572         // Do arbitration on new transactions that were received
1573         arbitrateFromServer();
1574
1575         // Update all the committed keys
1576         bool didCommitOrSpeculate = updateCommittedTable();
1577
1578         // Delete the transactions that are now dead
1579         updateLiveTransactionsAndStatus();
1580
1581         // Do speculations
1582         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1583         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1584 }
1585
1586 void Table::updateLiveStateFromLocal() {
1587         // Update all the committed keys
1588         bool didCommitOrSpeculate = updateCommittedTable();
1589
1590         // Delete the transactions that are now dead
1591         updateLiveTransactionsAndStatus();
1592
1593         // Do speculations
1594         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1595         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1596 }
1597
1598 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1599         int64_t prevslots = firstSequenceNumber;
1600
1601         if (didFindTableStatus) {
1602         } else {
1603                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1604         }
1605
1606         didFindTableStatus = true;
1607         currMaxSize = numberOfSlots;
1608 }
1609
1610 void Table::updateExpectedSize() {
1611         expectedsize++;
1612
1613         if (expectedsize > currMaxSize) {
1614                 expectedsize = currMaxSize;
1615         }
1616 }
1617
1618
1619 /**
1620  * Check the size of the block chain to make sure there are enough
1621  * slots sent back by the server-> This is only called when we have a
1622  * gap between the slots that we have locally and the slots sent by
1623  * the server therefore in the slots sent by the server there will be
1624  * at least 1 Table status message
1625  */
1626 void Table::checkNumSlots(int numberOfSlots) {
1627         if (numberOfSlots != expectedsize) {
1628                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1629         }
1630 }
1631
1632 /**
1633  * Update the size of of the local buffer if it is needed->
1634  */
1635 void Table::commitNewMaxSize() {
1636         didFindTableStatus = false;
1637
1638         // Resize the local slot buffer
1639         if (numberOfSlots != currMaxSize) {
1640                 buffer->resize((int32_t)currMaxSize);
1641         }
1642
1643         // Change the number of local slots to the new size
1644         numberOfSlots = (int32_t)currMaxSize;
1645
1646         // Recalculate the resize threshold since the size of the local
1647         // buffer has changed
1648         setResizeThreshold();
1649 }
1650
1651 /**
1652  * Process the new transaction parts from this latest round of slots
1653  * received from the server
1654  */
1655 void Table::processNewTransactionParts() {
1656
1657         if (newTransactionParts->size() == 0) {
1658                 // Nothing new to process
1659                 return;
1660         }
1661
1662         // Iterate through all the machine Ids that we received new parts
1663         // for
1664         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1665         while (tpit->hasNext()) {
1666                 int64_t machineId = tpit->next();
1667                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1668
1669                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1670                 // Iterate through all the parts for that machine Id
1671                 while (ptit->hasNext()) {
1672                         Pair<int64_t, int32_t> *partId = ptit->next();
1673                         TransactionPart *part = parts->get(partId);
1674
1675                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1676                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1677                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1678                                         // Set dead the transaction part
1679                                         part->setDead();
1680                                         continue;
1681                                 }
1682                         }
1683
1684                         // Get the transaction object for that sequence number
1685                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1686
1687                         if (transaction == NULL) {
1688                                 // This is a new transaction that we dont have so make a new one
1689                                 transaction = new Transaction();
1690                                 
1691                                 // Add that part to the transaction
1692                                 transaction->addPartDecode(part);
1693
1694                                 // Insert this new transaction into the live tables
1695                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1696                                 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1697                         }
1698                 }
1699                 delete ptit;
1700         }
1701         delete tpit;
1702         // Clear all the new transaction parts in preparation for the next
1703         // time the server sends slots
1704         {
1705                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1706                 while (partsit->hasNext()) {
1707                         int64_t machineId = partsit->next();
1708                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1709                         delete parts;
1710                 }
1711                 delete partsit;
1712                 newTransactionParts->clear();
1713         }
1714 }
1715
1716 void Table::arbitrateFromServer() {
1717         if (liveTransactionBySequenceNumberTable->size() == 0) {
1718                 // Nothing to arbitrate on so move on
1719                 return;
1720         }
1721
1722         // Get the transaction sequence numbers and sort from oldest to newest
1723         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1724         {
1725                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1726                 while (trit->hasNext())
1727                         transactionSequenceNumbers->add(trit->next());
1728                 delete trit;
1729         }
1730         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1731
1732         // Collection of key value pairs that are
1733         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1734
1735         // The last transaction arbitrated on
1736         int64_t lastTransactionCommitted = -1;
1737         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1738         uint tsnSize = transactionSequenceNumbers->size();
1739         for (uint i = 0; i < tsnSize; i++) {
1740                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1741                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1742
1743                 // Check if this machine arbitrates for this transaction if not
1744                 // then we cant arbitrate this transaction
1745                 if (transaction->getArbitrator() != localMachineId) {
1746                         continue;
1747                 }
1748
1749                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1750                         continue;
1751                 }
1752
1753                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1754                         // We have seen this already locally so dont commit again
1755                         continue;
1756                 }
1757
1758                 if (!transaction->isComplete()) {
1759                         // Will arbitrate in incorrect order if we continue so just break
1760                         // Most likely this
1761                         break;
1762                 }
1763
1764                 // update the largest transaction seen by arbitrator from server
1765                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1766                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1767                 } else {
1768                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1769                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1770                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1771                         }
1772                 }
1773
1774                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1775                         // Guard evaluated as true
1776                         // Update the local changes so we can make the commit
1777                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1778                         while (kvit->hasNext()) {
1779                                 KeyValue *kv = kvit->next();
1780                                 speculativeTableTmp->put(kv->getKey(), kv);
1781                         }
1782                         delete kvit;
1783
1784                         // Update what the last transaction committed was for use in batch commit
1785                         lastTransactionCommitted = transactionSequenceNumber;
1786                 } else {
1787                         // Guard evaluated was false so create abort
1788                         // Create the abort
1789                         Abort *newAbort = new Abort(NULL,
1790                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1791                                                                                                                                         transaction->getSequenceNumber(),
1792                                                                                                                                         transaction->getMachineId(),
1793                                                                                                                                         transaction->getArbitrator(),
1794                                                                                                                                         localArbitrationSequenceNumber);
1795                         localArbitrationSequenceNumber++;
1796                         generatedAborts->add(newAbort);
1797
1798                         // Insert the abort so we can process
1799                         processEntry(newAbort);
1800                 }
1801
1802                 lastSeqNumArbOn = transactionSequenceNumber;
1803         }
1804
1805         Commit *newCommit = NULL;
1806
1807         // If there is something to commit
1808         if (speculativeTableTmp->size() != 0) {
1809                 // Create the commit and increment the commit sequence number
1810                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1811                 localArbitrationSequenceNumber++;
1812
1813                 // Add all the new keys to the commit
1814                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1815                 while (spit->hasNext()) {
1816                         IoTString *string = spit->next();
1817                         KeyValue *kv = speculativeTableTmp->get(string);
1818                         newCommit->addKV(kv);
1819                 }
1820                 delete spit;
1821                 
1822                 // create the commit parts
1823                 newCommit->createCommitParts();
1824
1825                 // Append all the commit parts to the end of the pending queue
1826                 // waiting for sending to the server
1827                 // Insert the commit so we can process it
1828                 Vector<CommitPart *> *parts = newCommit->getParts();
1829                 uint partsSize = parts->size();
1830                 for (uint i = 0; i < partsSize; i++) {
1831                         CommitPart *commitPart = parts->get(i);
1832                         processEntry(commitPart);
1833                 }
1834         }
1835         delete speculativeTableTmp;
1836
1837         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1838                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1839                 pendingSendArbitrationRounds->add(arbitrationRound);
1840
1841                 if (compactArbitrationData()) {
1842                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1843                         if (newArbitrationRound->getCommit() != NULL) {
1844                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1845                                 uint partsSize = parts->size();
1846                                 for (uint i = 0; i < partsSize; i++) {
1847                                         CommitPart *commitPart = parts->get(i);
1848                                         processEntry(commitPart);
1849                                 }
1850                         }
1851                 }
1852         }
1853 }
1854
1855 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1856
1857         // Check if this machine arbitrates for this transaction if not then
1858         // we cant arbitrate this transaction
1859         if (transaction->getArbitrator() != localMachineId) {
1860                 return Pair<bool, bool>(false, false);
1861         }
1862
1863         if (!transaction->isComplete()) {
1864                 // Will arbitrate in incorrect order if we continue so just break
1865                 // Most likely this
1866                 return Pair<bool, bool>(false, false);
1867         }
1868
1869         if (transaction->getMachineId() != localMachineId) {
1870                 // dont do this check for local transactions
1871                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1872                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1873                                 // We've have already seen this from the server
1874                                 return Pair<bool, bool>(false, false);
1875                         }
1876                 }
1877         }
1878
1879         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1880                 // Guard evaluated as true Create the commit and increment the
1881                 // commit sequence number
1882                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1883                 localArbitrationSequenceNumber++;
1884
1885                 // Update the local changes so we can make the commit
1886                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1887                 while (kvit->hasNext()) {
1888                         KeyValue *kv = kvit->next();
1889                         newCommit->addKV(kv);
1890                 }
1891                 delete kvit;
1892
1893                 // create the commit parts
1894                 newCommit->createCommitParts();
1895
1896                 // Append all the commit parts to the end of the pending queue
1897                 // waiting for sending to the server
1898                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1899                 pendingSendArbitrationRounds->add(arbitrationRound);
1900
1901                 if (compactArbitrationData()) {
1902                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1903                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1904                         uint partsSize = parts->size();
1905                         for (uint i = 0; i < partsSize; i++) {
1906                                 CommitPart *commitPart = parts->get(i);
1907                                 processEntry(commitPart);
1908                         }
1909                 } else {
1910                         // Insert the commit so we can process it
1911                         Vector<CommitPart *> *parts = newCommit->getParts();
1912                         uint partsSize = parts->size();
1913                         for (uint i = 0; i < partsSize; i++) {
1914                                 CommitPart *commitPart = parts->get(i);
1915                                 processEntry(commitPart);
1916                         }
1917                 }
1918
1919                 if (transaction->getMachineId() == localMachineId) {
1920                         TransactionStatus *status = transaction->getTransactionStatus();
1921                         if (status != NULL) {
1922                                 status->setStatus(TransactionStatus_StatusCommitted);
1923                         }
1924                 }
1925
1926                 updateLiveStateFromLocal();
1927                 return Pair<bool, bool>(true, true);
1928         } else {
1929                 if (transaction->getMachineId() == localMachineId) {
1930                         // For locally created messages update the status
1931                         // Guard evaluated was false so create abort
1932                         TransactionStatus *status = transaction->getTransactionStatus();
1933                         if (status != NULL) {
1934                                 status->setStatus(TransactionStatus_StatusAborted);
1935                         }
1936                 } else {
1937                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1938
1939                         // Create the abort
1940                         Abort *newAbort = new Abort(NULL,
1941                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1942                                                                                                                                         -1,
1943                                                                                                                                         transaction->getMachineId(),
1944                                                                                                                                         transaction->getArbitrator(),
1945                                                                                                                                         localArbitrationSequenceNumber);
1946                         localArbitrationSequenceNumber++;
1947                         addAbortSet->add(newAbort);
1948
1949                         // Append all the commit parts to the end of the pending queue
1950                         // waiting for sending to the server
1951                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1952                         pendingSendArbitrationRounds->add(arbitrationRound);
1953
1954                         if (compactArbitrationData()) {
1955                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1956
1957                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1958                                 uint partsSize = parts->size();
1959                                 for (uint i = 0; i < partsSize; i++) {
1960                                         CommitPart *commitPart = parts->get(i);
1961                                         processEntry(commitPart);
1962                                 }
1963                         }
1964                 }
1965
1966                 updateLiveStateFromLocal();
1967                 return Pair<bool, bool>(true, false);
1968         }
1969 }
1970
1971 /**
1972  * Compacts the arbitration data by merging commits and aggregating
1973  * aborts so that a single large push of commits can be done instead
1974  * of many small updates
1975  */
1976 bool Table::compactArbitrationData() {
1977         if (pendingSendArbitrationRounds->size() < 2) {
1978                 // Nothing to compact so do nothing
1979                 return false;
1980         }
1981
1982         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1983         if (lastRound->getDidSendPart()) {
1984                 return false;
1985         }
1986
1987         bool hadCommit = (lastRound->getCommit() == NULL);
1988         bool gotNewCommit = false;
1989
1990         uint numberToDelete = 1;
1991         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1992                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1993
1994                 if (round->isFull() || round->getDidSendPart()) {
1995                         // Stop since there is a part that cannot be compacted and we
1996                         // need to compact in order
1997                         break;
1998                 }
1999
2000                 if (round->getCommit() == NULL) {
2001                         // Try compacting aborts only
2002                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
2003                         if (newSize > ArbitrationRound_MAX_PARTS) {
2004                                 // Cant compact since it would be too large
2005                                 break;
2006                         }
2007                         lastRound->addAborts(round->getAborts());
2008                 } else {
2009                         // Create a new larger commit
2010                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2011                         localArbitrationSequenceNumber++;
2012
2013                         // Create the commit parts so that we can count them
2014                         newCommit->createCommitParts();
2015
2016                         // Calculate the new size of the parts
2017                         int newSize = newCommit->getNumberOfParts();
2018                         newSize += lastRound->getAbortsCount();
2019                         newSize += round->getAbortsCount();
2020
2021                         if (newSize > ArbitrationRound_MAX_PARTS) {
2022                                 // Cant compact since it would be too large
2023                                 break;
2024                         }
2025
2026                         // Set the new compacted part
2027                         if (lastRound->getCommit() == newCommit)
2028                                 lastRound->setCommit(NULL);
2029                         if (round->getCommit() == newCommit)
2030                                 round->setCommit(NULL);
2031                         
2032                         if (lastRound->getCommit() != NULL) {
2033                                 Commit * oldcommit = lastRound->getCommit();
2034                                 lastRound->setCommit(NULL);
2035                                 delete oldcommit;
2036                         }
2037                         lastRound->setCommit(newCommit);
2038                         lastRound->addAborts(round->getAborts());
2039                         gotNewCommit = true;
2040                 }
2041
2042                 numberToDelete++;
2043         }
2044
2045         if (numberToDelete != 1) {
2046                 // If there is a compaction
2047                 // Delete the previous pieces that are now in the new compacted piece
2048                 for (uint i = 2; i <= numberToDelete; i++) {
2049                         delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2050                 }
2051                 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2052
2053                 pendingSendArbitrationRounds->add(lastRound);
2054
2055                 // Should reinsert into the commit processor
2056                 if (hadCommit && gotNewCommit) {
2057                         return true;
2058                 }
2059         }
2060
2061         return false;
2062 }
2063
2064 /**
2065  * Update all the commits and the committed tables, sets dead the dead
2066  * transactions
2067  */
2068 bool Table::updateCommittedTable() {
2069         if (newCommitParts->size() == 0) {
2070                 // Nothing new to process
2071                 return false;
2072         }
2073
2074         // Iterate through all the machine Ids that we received new parts for
2075         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2076         while (partsit->hasNext()) {
2077                 int64_t machineId = partsit->next();
2078                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2079
2080                 // Iterate through all the parts for that machine Id
2081                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2082                 while (pairit->hasNext()) {
2083                         Pair<int64_t, int32_t> *partId = pairit->next();
2084                         CommitPart *part = parts->get(partId);
2085
2086                         // Get the transaction object for that sequence number
2087                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2088
2089                         if (commitForClientTable == NULL) {
2090                                 // This is the first commit from this device
2091                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2092                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2093                         }
2094
2095                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2096
2097                         if (commit == NULL) {
2098                                 // This is a new commit that we dont have so make a new one
2099                                 commit = new Commit();
2100
2101                                 // Insert this new commit into the live tables
2102                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2103                         }
2104
2105                         // Add that part to the commit
2106                         commit->addPartDecode(part);
2107                 }
2108                 delete pairit;
2109                 delete parts;
2110         }
2111         delete partsit;
2112
2113         // Clear all the new commits parts in preparation for the next time
2114         // the server sends slots
2115         newCommitParts->clear();
2116
2117         // If we process a new commit keep track of it for future use
2118         bool didProcessANewCommit = false;
2119
2120         // Process the commits one by one
2121         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2122         while (liveit->hasNext()) {
2123                 int64_t arbitratorId = liveit->next();
2124
2125                 // Get all the commits for a specific arbitrator
2126                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2127
2128                 // Sort the commits in order
2129                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2130                 {
2131                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2132                         while (clientit->hasNext())
2133                                 commitSequenceNumbers->add(clientit->next());
2134                         delete clientit;
2135                 }
2136
2137                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2138
2139                 // Get the last commit seen from this arbitrator
2140                 int64_t lastCommitSeenSequenceNumber = -1;
2141                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2142                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2143                 }
2144
2145                 // Go through each new commit one by one
2146                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2147                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2148                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2149
2150                         // Special processing if a commit is not complete
2151                         if (!commit->isComplete()) {
2152                                 if (i == (commitSequenceNumbers->size() - 1)) {
2153                                         // If there is an incomplete commit and this commit is the
2154                                         // latest one seen then this commit cannot be processed and
2155                                         // there are no other commits
2156                                         break;
2157                                 } else {
2158                                         // This is a commit that was already dead but parts of it
2159                                         // are still in the block chain (not flushed out yet)->
2160                                         // Delete it and move on
2161                                         commit->setDead();
2162                                         commitForClientTable->remove(commit->getSequenceNumber());
2163                                         delete commit;
2164                                         continue;
2165                                 }
2166                         }
2167
2168                         // Update the last transaction that was updated if we can
2169                         if (commit->getTransactionSequenceNumber() != -1) {
2170                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2171                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2172                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2173                                 }
2174                         }
2175
2176                         // Update the last arbitration data that we have seen so far
2177                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2178                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2179                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2180                                         // Is larger
2181                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2182                                 }
2183                         } else {
2184                                 // Never seen any data from this arbitrator so record the first one
2185                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2186                         }
2187
2188                         // We have already seen this commit before so need to do the
2189                         // full processing on this commit
2190                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2191                                 // Update the last transaction that was updated if we can
2192                                 if (commit->getTransactionSequenceNumber() != -1) {
2193                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2194                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2195                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2196                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2197                                         }
2198                                 }
2199                                 continue;
2200                         }
2201
2202                         // If we got here then this is a brand new commit and needs full
2203                         // processing
2204                         // Get what commits should be edited, these are the commits that
2205                         // have live values for their keys
2206                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2207                         {
2208                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2209                                 while (kvit->hasNext()) {
2210                                         KeyValue *kv = kvit->next();
2211                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2212                                         if (commit != NULL)
2213                                                 commitsToEdit->add(commit);
2214                                 }
2215                                 delete kvit;
2216                         }
2217
2218                         // Update each previous commit that needs to be updated
2219                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2220                         while (commitit->hasNext()) {
2221                                 Commit *previousCommit = commitit->next();
2222
2223                                 // Only bother with live commits (TODO: Maybe remove this check)
2224                                 if (previousCommit->isLive()) {
2225
2226                                         // Update which keys in the old commits are still live
2227                                         {
2228                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2229                                                 while (kvit->hasNext()) {
2230                                                         KeyValue *kv = kvit->next();
2231                                                         previousCommit->invalidateKey(kv->getKey());
2232                                                 }
2233                                                 delete kvit;
2234                                         }
2235
2236                                         // if the commit is now dead then remove it
2237                                         if (!previousCommit->isLive()) {
2238                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2239                                                 delete previousCommit;
2240                                         }
2241                                 }
2242                         }
2243                         delete commitit;
2244                         delete commitsToEdit;
2245
2246                         // Update the last seen sequence number from this arbitrator
2247                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2248                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2249                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2250                                 }
2251                         } else {
2252                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2253                         }
2254
2255                         // We processed a new commit that we havent seen before
2256                         didProcessANewCommit = true;
2257
2258                         // Update the committed table of keys and which commit is using which key
2259                         {
2260                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2261                                 while (kvit->hasNext()) {
2262                                         KeyValue *kv = kvit->next();
2263                                         printf("Commited KeyValue Table update for %p\n", this);
2264                                         kv->getKey()->print();
2265                                         printf("\n");
2266                                         kv->getValue()->print();
2267                                         printf("\n");
2268                                         committedKeyValueTable->put(kv->getKey(), kv);
2269                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2270                                 }
2271                                 delete kvit;
2272                         }
2273                 }
2274         }
2275         delete liveit;
2276
2277         return didProcessANewCommit;
2278 }
2279
2280 /**
2281  * Create the speculative table from transactions that are still live
2282  * and have come from the cloud
2283  */
2284 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2285         if (liveTransactionBySequenceNumberTable->size() == 0) {
2286                 // There is nothing to speculate on
2287                 return false;
2288         }
2289
2290         // Create a list of the transaction sequence numbers and sort them
2291         // from oldest to newest
2292         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2293         {
2294                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2295                 while (trit->hasNext())
2296                         transactionSequenceNumbersSorted->add(trit->next());
2297                 delete trit;
2298         }
2299
2300         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2301
2302         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2303
2304
2305         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2306                 // If there is a gap in the transaction sequence numbers then
2307                 // there was a commit or an abort of a transaction OR there was a
2308                 // new commit (Could be from offline commit) so a redo the
2309                 // speculation from scratch
2310
2311                 // Start from scratch
2312                 speculatedKeyValueTable->clear();
2313                 lastTransactionSequenceNumberSpeculatedOn = -1;
2314                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2315         }
2316
2317         // Remember the front of the transaction list
2318         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2319
2320         // Find where to start arbitration from
2321         uint startIndex = 0;
2322
2323         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2324                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2325                         break;
2326         startIndex++;
2327
2328         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2329                 // Make sure we are not out of bounds
2330                 return false;           // did not speculate
2331         }
2332
2333         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2334         bool didSkip = true;
2335
2336         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2337                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2338                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2339
2340                 if (!transaction->isComplete()) {
2341                         // If there is an incomplete transaction then there is nothing
2342                         // we can do add this transactions arbitrator to the list of
2343                         // arbitrators we should ignore
2344                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2345                         didSkip = true;
2346                         continue;
2347                 }
2348
2349                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2350                         continue;
2351                 }
2352
2353                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2354
2355                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2356                         // Guard evaluated to true so update the speculative table
2357                         {
2358                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2359                                 while (kvit->hasNext()) {
2360                                         KeyValue *kv = kvit->next();
2361                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2362                                 }
2363                                 delete kvit;
2364                         }
2365                 }
2366         }
2367
2368         if (didSkip) {
2369                 // Since there was a skip we need to redo the speculation next time around
2370                 lastTransactionSequenceNumberSpeculatedOn = -1;
2371                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2372         }
2373
2374         // We did some speculation
2375         return true;
2376 }
2377
2378 /**
2379  * Create the pending transaction speculative table from transactions
2380  * that are still in the pending transaction buffer
2381  */
2382 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2383         if (pendingTransactionQueue->size() == 0) {
2384                 // There is nothing to speculate on
2385                 return;
2386         }
2387
2388         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2389                 // need to reset on the pending speculation
2390                 lastPendingTransactionSpeculatedOn = NULL;
2391                 firstPendingTransaction = pendingTransactionQueue->get(0);
2392                 pendingTransactionSpeculatedKeyValueTable->clear();
2393         }
2394
2395         // Find where to start arbitration from
2396         uint startIndex = 0;
2397
2398         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2399                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2400                         break;
2401
2402         if (startIndex >= pendingTransactionQueue->size()) {
2403                 // Make sure we are not out of bounds
2404                 return;
2405         }
2406
2407         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2408                 Transaction *transaction = pendingTransactionQueue->get(i);
2409
2410                 lastPendingTransactionSpeculatedOn = transaction;
2411
2412                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2413                         // Guard evaluated to true so update the speculative table
2414                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2415                         while (kvit->hasNext()) {
2416                                 KeyValue *kv = kvit->next();
2417                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2418                         }
2419                         delete kvit;
2420                 }
2421         }
2422 }
2423
2424 /**
2425  * Set dead and remove from the live transaction tables the
2426  * transactions that are dead
2427  */
2428 void Table::updateLiveTransactionsAndStatus() {
2429         // Go through each of the transactions
2430         {
2431                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2432                 while (iter->hasNext()) {
2433                         int64_t key = iter->next();
2434                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2435
2436                         // Check if the transaction is dead
2437                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2438                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2439                                 // Set dead the transaction
2440                                 transaction->setDead();
2441
2442                                 // Remove the transaction from the live table
2443                                 iter->remove();
2444                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2445                                 delete transaction;
2446                         }
2447                 }
2448                 delete iter;
2449         }
2450
2451         // Go through each of the transactions
2452         {
2453                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2454                 while (iter->hasNext()) {
2455                         int64_t key = iter->next();
2456                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2457
2458                         // Check if the transaction is dead
2459                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2460                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2461                                 // Set committed
2462                                 status->setStatus(TransactionStatus_StatusCommitted);
2463
2464                                 // Remove
2465                                 iter->remove();
2466                         }
2467                 }
2468                 delete iter;
2469         }
2470 }
2471
2472 /**
2473  * Process this slot, entry by entry->  Also update the latest message sent by slot
2474  */
2475 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2476
2477         // Update the last message seen
2478         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2479
2480         // Process each entry in the slot
2481         Vector<Entry *> *entries = slot->getEntries();
2482         uint eSize = entries->size();
2483         for (uint ei = 0; ei < eSize; ei++) {
2484                 Entry *entry = entries->get(ei);
2485                 switch (entry->getType()) {
2486                 case TypeCommitPart:
2487                         processEntry((CommitPart *)entry);
2488                         break;
2489                 case TypeAbort:
2490                         processEntry((Abort *)entry);
2491                         break;
2492                 case TypeTransactionPart:
2493                         processEntry((TransactionPart *)entry);
2494                         break;
2495                 case TypeNewKey:
2496                         processEntry((NewKey *)entry);
2497                         break;
2498                 case TypeLastMessage:
2499                         processEntry((LastMessage *)entry, machineSet);
2500                         break;
2501                 case TypeRejectedMessage:
2502                         processEntry((RejectedMessage *)entry, indexer);
2503                         break;
2504                 case TypeTableStatus:
2505                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2506                         break;
2507                 default:
2508                         throw new Error("Unrecognized type: ");
2509                 }
2510         }
2511 }
2512
2513 /**
2514  * Update the last message that was sent for a machine Id
2515  */
2516 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2517         // Update what the last message received by a machine was
2518         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2519 }
2520
2521 /**
2522  * Add the new key to the arbitrators table and update the set of live
2523  * new keys (in case of a rescued new key message)
2524  */
2525 void Table::processEntry(NewKey *entry) {
2526         // Update the arbitrator table with the new key information
2527         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2528
2529         // Update what the latest live new key is
2530         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2531         if (oldNewKey != NULL) {
2532                 // Delete the old new key messages
2533                 oldNewKey->setDead();
2534         }
2535 }
2536
2537 /**
2538  * Process new table status entries and set dead the old ones as new
2539  * ones come in-> keeps track of the largest and smallest table status
2540  * seen in this current round of updating the local copy of the block
2541  * chain
2542  */
2543 void Table::processEntry(TableStatus *entry, int64_t seq) {
2544         int newNumSlots = entry->getMaxSlots();
2545         updateCurrMaxSize(newNumSlots);
2546         initExpectedSize(seq, newNumSlots);
2547
2548         if (liveTableStatus != NULL) {
2549                 // We have a larger table status so the old table status is no
2550                 // int64_ter alive
2551                 liveTableStatus->setDead();
2552         }
2553
2554         // Make this new table status the latest alive table status
2555         liveTableStatus = entry;
2556 }
2557
2558 /**
2559  * Check old messages to see if there is a block chain violation->
2560  * Also
2561  */
2562 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2563         int64_t oldSeqNum = entry->getOldSeqNum();
2564         int64_t newSeqNum = entry->getNewSeqNum();
2565         bool isequal = entry->getEqual();
2566         int64_t machineId = entry->getMachineID();
2567         int64_t seq = entry->getSequenceNumber();
2568
2569         // Check if we have messages that were supposed to be rejected in
2570         // our local block chain
2571         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2572                 // Get the slot
2573                 Slot *slot = indexer->getSlot(seqNum);
2574
2575                 if (slot != NULL) {
2576                         // If we have this slot make sure that it was not supposed to be
2577                         // a rejected slot
2578                         int64_t slotMachineId = slot->getMachineID();
2579                         if (isequal != (slotMachineId == machineId)) {
2580                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2581                         }
2582                 }
2583         }
2584
2585         // Create a list of clients to watch until they see this rejected
2586         // message entry->
2587         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2588         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2589         while (iter->hasNext()) {
2590                 // Machine ID for the last message entry
2591                 int64_t lastMessageEntryMachineId = iter->next();
2592
2593                 // We've seen it, don't need to continue to watch->  Our next
2594                 // message will implicitly acknowledge it->
2595                 if (lastMessageEntryMachineId == localMachineId) {
2596                         continue;
2597                 }
2598
2599                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2600                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2601
2602                 if (entrySequenceNumber < seq) {
2603                         // Add this rejected message to the set of messages that this
2604                         // machine ID did not see yet
2605                         addWatchVector(lastMessageEntryMachineId, entry);
2606                         // This client did not see this rejected message yet so add it
2607                         // to the watch set to monitor
2608                         deviceWatchSet->add(lastMessageEntryMachineId);
2609                 }
2610         }
2611         delete iter;
2612
2613         if (deviceWatchSet->isEmpty()) {
2614                 // This rejected message has been seen by all the clients so
2615                 entry->setDead();
2616                 delete deviceWatchSet;
2617         } else {
2618                 // We need to watch this rejected message
2619                 entry->setWatchSet(deviceWatchSet);
2620         }
2621 }
2622
2623 /**
2624  * Check if this abort is live, if not then save it so we can kill it
2625  * later-> update the last transaction number that was arbitrated on->
2626  */
2627 void Table::processEntry(Abort *entry) {
2628         if (entry->getTransactionSequenceNumber() != -1) {
2629                 // update the transaction status if it was sent to the server
2630                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2631                 if (status != NULL) {
2632                         status->setStatus(TransactionStatus_StatusAborted);
2633                 }
2634         }
2635
2636         // Abort has not been seen by the client it is for yet so we need to
2637         // keep track of it
2638
2639         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2640         if (previouslySeenAbort != NULL) {
2641                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2642         }
2643
2644         if (entry->getTransactionArbitrator() == localMachineId) {
2645                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2646         }
2647
2648         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2649                 // The machine already saw this so it is dead
2650                 entry->setDead();
2651                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2652                 liveAbortTable->remove(&abortid);
2653
2654                 if (entry->getTransactionArbitrator() == localMachineId) {
2655                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2656                 }
2657                 return;
2658         }
2659
2660         // Update the last arbitration data that we have seen so far
2661         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2662                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2663                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2664                         // Is larger
2665                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2666                 }
2667         } else {
2668                 // Never seen any data from this arbitrator so record the first one
2669                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2670         }
2671
2672         // Set dead a transaction if we can
2673         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2674
2675         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2676         if (transactionToSetDead != NULL) {
2677                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2678         }
2679
2680         // Update the last transaction sequence number that the arbitrator
2681         // arbitrated on
2682         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2683                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2684                 // Is a valid one
2685                 if (entry->getTransactionSequenceNumber() != -1) {
2686                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2687                 }
2688         }
2689 }
2690
2691 /**
2692  * Set dead the transaction part if that transaction is dead and keep
2693  * track of all new parts
2694  */
2695 void Table::processEntry(TransactionPart *entry) {
2696         // Check if we have already seen this transaction and set it dead OR
2697         // if it is not alive
2698         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2699                 // This transaction is dead, it was already committed or aborted
2700                 entry->setDead();
2701                 return;
2702         }
2703
2704         // This part is still alive
2705         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2706
2707         if (transactionPart == NULL) {
2708                 // Dont have a table for this machine Id yet so make one
2709                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2710                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2711         }
2712
2713         // Update the part and set dead ones we have already seen (got a
2714         // rescued version)
2715         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2716         if (previouslySeenPart != NULL) {
2717                 previouslySeenPart->setDead();
2718         }
2719 }
2720
2721 /**
2722  * Process new commit entries and save them for future use->  Delete duplicates
2723  */
2724 void Table::processEntry(CommitPart *entry) {
2725         // Update the last transaction that was updated if we can
2726         if (entry->getTransactionSequenceNumber() != -1) {
2727                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2728                                 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2729                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2730                 }
2731         }
2732
2733         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2734         if (commitPart == NULL) {
2735                 // Don't have a table for this machine Id yet so make one
2736                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2737                 newCommitParts->put(entry->getMachineId(), commitPart);
2738         }
2739         // Update the part and set dead ones we have already seen (got a
2740         // rescued version)
2741         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2742         if (previouslySeenPart != NULL) {
2743                 previouslySeenPart->setDead();
2744         }
2745 }
2746
2747 /**
2748  * Update the last message seen table-> Update and set dead the
2749  * appropriate RejectedMessages as clients see them-> Updates the live
2750  * aborts, removes those that are dead and sets them dead-> Check that
2751  * the last message seen is correct and that there is no mismatch of
2752  * our own last message or that other clients have not had a rollback
2753  * on the last message->
2754  */
2755 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2756         // We have seen this machine ID
2757         machineSet->remove(machineId);
2758
2759         // Get the set of rejected messages that this machine Id is has not seen yet
2760         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2761         // If there is a rejected message that this machine Id has not seen yet
2762         if (watchset != NULL) {
2763                 // Go through each rejected message that this machine Id has not
2764                 // seen yet
2765
2766                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2767                 while (rmit->hasNext()) {
2768                         RejectedMessage *rm = rmit->next();
2769                         // If this machine Id has seen this rejected message->->->
2770                         if (rm->getSequenceNumber() <= seqNum) {
2771                                 // Remove it from our watchlist
2772                                 rmit->remove();
2773                                 // Decrement machines that need to see this notification
2774                                 rm->removeWatcher(machineId);
2775                         }
2776                 }
2777                 delete rmit;
2778         }
2779
2780         // Set dead the abort
2781         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2782
2783         while (abortit->hasNext()) {
2784                 Pair<int64_t, int64_t> *key = abortit->next();
2785                 Abort *abort = liveAbortTable->get(key);
2786                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2787                         abort->setDead();
2788                         abortit->remove();
2789                         if (abort->getTransactionArbitrator() == localMachineId) {
2790                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2791                         }
2792                 }
2793         }
2794         delete abortit;
2795         if (machineId == localMachineId) {
2796                 // Our own messages are immediately dead->
2797                 char livenessType = liveness->getType();
2798                 if (livenessType == TypeLastMessage) {
2799                         ((LastMessage *)liveness)->setDead();
2800                 } else if (livenessType == TypeSlot) {
2801                         ((Slot *)liveness)->setDead();
2802                 } else {
2803                         throw new Error("Unrecognized type");
2804                 }
2805         }
2806         // Get the old last message for this device
2807         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2808         if (lastMessageEntry == NULL) {
2809                 // If no last message then there is nothing else to process
2810                 return;
2811         }
2812
2813         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2814         Liveness *lastEntry = lastMessageEntry->getSecond();
2815         delete lastMessageEntry;
2816
2817         // If it is not our machine Id since we already set ours to dead
2818         if (machineId != localMachineId) {
2819                 char lastEntryType = lastEntry->getType();
2820
2821                 if (lastEntryType == TypeLastMessage) {
2822                         ((LastMessage *)lastEntry)->setDead();
2823                 } else if (lastEntryType == TypeSlot) {
2824                         ((Slot *)lastEntry)->setDead();
2825                 } else {
2826                         throw new Error("Unrecognized type");
2827                 }
2828         }
2829         // Make sure the server is not playing any games
2830         if (machineId == localMachineId) {
2831                 if (hadPartialSendToServer) {
2832                         // We were not making any updates and we had a machine mismatch
2833                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2834                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2835                         }
2836                 } else {
2837                         // We were not making any updates and we had a machine mismatch
2838                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2839                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2840                         }
2841                 }
2842         } else {
2843                 if (lastMessageSeqNum > seqNum) {
2844                         throw new Error("Server Error: Rollback on remote machine sequence number");
2845                 }
2846         }
2847 }
2848
2849 /**
2850  * Add a rejected message entry to the watch set to keep track of
2851  * which clients have seen that rejected message entry and which have
2852  * not.
2853  */
2854 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2855         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2856         if (entries == NULL) {
2857                 // There is no set for this machine ID yet so create one
2858                 entries = new Hashset<RejectedMessage *>();
2859                 rejectedMessageWatchVectorTable->put(machineId, entries);
2860         }
2861         entries->add(entry);
2862 }
2863
2864 /**
2865  * Check if the HMAC chain is not violated
2866  */
2867 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2868         for (uint i = 0; i < newSlots->length(); i++) {
2869                 Slot *currSlot = newSlots->get(i);
2870                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2871                 if (prevSlot != NULL &&
2872                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2873                         throw new Error("Server Error: Invalid HMAC Chain");
2874         }
2875 }