edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localSequenceNumber(0),
112         localTransactionSequenceNumber(0),
113         lastTransactionSequenceNumberSpeculatedOn(0),
114         oldestTransactionSequenceNumberSpeculatedOn(0),
115         localArbitrationSequenceNumber(0),
116         hadPartialSendToServer(false),
117         attemptedToSendToServer(false),
118         expectedsize(0),
119         didFindTableStatus(false),
120         currMaxSize(0),
121         lastSlotAttemptedToSend(NULL),
122         lastIsNewKey(false),
123         lastNewSize(0),
124         lastTransactionPartsSent(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 Table::~Table() {
160         delete cloud;
161         delete random;
162         delete buffer;
163         // init data structs
164         delete committedKeyValueTable;
165         delete speculatedKeyValueTable;
166         delete pendingTransactionSpeculatedKeyValueTable;
167         delete liveNewKeyTable;
168         {
169                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170                 while (lmit->hasNext()) {
171                         Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
172                         delete pair;
173                 }
174                 delete lmit;
175                 delete lastMessageTable;
176         }
177         if (pendingTransactionBuilder != NULL)
178                 delete pendingTransactionBuilder;
179         {
180                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
181                 while(rmit->hasNext()) {
182                         int64_t machineid = rmit->next();
183                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
184                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185                         while (mit->hasNext()) {
186                                 RejectedMessage * rm = mit->next();
187                                 delete rm;
188                         }
189                         delete mit;
190                         delete rmset;
191                 }
192                 delete rmit;
193                 delete rejectedMessageWatchVectorTable;
194         }
195         delete arbitratorTable;
196         delete liveAbortTable;
197         {
198                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199                 while (partsit->hasNext()) {
200                         int64_t machineId = partsit->next();
201                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202                         delete parts;
203                 }
204                 delete partsit;
205                 delete newTransactionParts;
206         }
207         {
208                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
209                 while (partsit->hasNext()) {
210                         int64_t machineId = partsit->next();
211                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
212                         delete parts;
213                 }
214                 delete partsit;
215                 delete newCommitParts;
216         }
217         delete lastArbitratedTransactionNumberByArbitratorTable;
218         delete liveTransactionBySequenceNumberTable;
219         delete liveTransactionByTransactionIdTable;
220         {
221                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
222                 while (liveit->hasNext()) {
223                         int64_t arbitratorId = liveit->next();
224                         
225                         // Get all the commits for a specific arbitrator
226                         Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
227                         {
228                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
229                                 while (clientit->hasNext()) {
230                                         int64_t id = clientit->next();
231                                         delete commitForClientTable->get(id);
232                                 }
233                                 delete clientit;
234                         }
235                         
236                         delete commitForClientTable;
237                 }
238                 delete liveit;
239                 delete liveCommitsTable;
240         }
241         delete liveCommitsByKeyTable;
242         delete lastCommitSeenSequenceNumberByArbitratorTable;
243         delete rejectedSlotVector;
244         {
245                 uint size = pendingTransactionQueue->size();
246                 for (uint iter = 0; iter < size; iter++) {
247                         delete pendingTransactionQueue->get(iter);
248                 }
249                 delete pendingTransactionQueue;
250         }
251         delete pendingSendArbitrationEntriesToDelete;
252         {
253                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
254                 while (trit->hasNext()) {
255                         Transaction *transaction = trit->next();
256                         delete trit->currVal();
257                 }
258                 delete trit;
259                 delete transactionPartsSent;
260         }
261         delete outstandingTransactionStatus;
262         delete liveAbortsGeneratedByLocal;
263         delete offlineTransactionsCommittedAndAtServer;
264         delete localCommunicationTable;
265         delete lastTransactionSeenFromMachineFromServer;
266         {
267                 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
268                         delete pendingSendArbitrationRounds->get(i);
269                 }
270                 delete pendingSendArbitrationRounds;
271         }
272         if (lastTransactionPartsSent != NULL)
273                 delete lastTransactionPartsSent;
274         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
275         if (lastNewKey)
276                 delete lastNewKey;
277 }
278
279 /**
280  * Init all the stuff needed for for table usage
281  */
282 void Table::init() {
283         // Init helper objects
284         random = new SecureRandom();
285         buffer = new SlotBuffer();
286
287         // init data structs
288         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
289         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
290         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
291         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
292         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
293         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
294         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
295         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
296         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
297         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
298         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
299         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
300         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
301         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
302         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
303         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
304         rejectedSlotVector = new Vector<int64_t>();
305         pendingTransactionQueue = new Vector<Transaction *>();
306         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
307         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
308         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
309         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
310         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
311         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
312         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
313         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
314         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
315
316         // Other init stuff
317         numberOfSlots = buffer->capacity();
318         setResizeThreshold();
319 }
320
321 /**
322  * Initialize the table by inserting a table status as the first entry
323  * into the table status also initialize the crypto stuff.
324  */
325 void Table::initTable() {
326         cloud->initSecurity();
327
328         // Create the first insertion into the block chain which is the table status
329         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
330         localSequenceNumber++;
331         TableStatus *status = new TableStatus(s, numberOfSlots);
332         s->addShallowEntry(status);
333         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
334
335         if (array == NULL) {
336                 array = new Array<Slot *>(1);
337                 array->set(0, s);
338                 // update local block chain
339                 validateAndUpdate(array, true);
340                 delete array;
341         } else if (array->length() == 1) {
342                 // in case we did push the slot BUT we failed to init it
343                 validateAndUpdate(array, true);
344                 delete s;
345                 delete array;
346         } else {
347                 delete s;
348                 delete array;
349                 throw new Error("Error on initialization");
350         }
351 }
352
353 /**
354  * Rebuild the table from scratch by pulling the latest block chain
355  * from the server.
356  */
357 void Table::rebuild() {
358         // Just pull the latest slots from the server
359         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
360         validateAndUpdate(newslots, true);
361         delete newslots;
362         sendToServer(NULL);
363         updateLiveTransactionsAndStatus();
364 }
365
366 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
367         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
368 }
369
370 int64_t Table::getArbitrator(IoTString *key) {
371         return arbitratorTable->get(key);
372 }
373
374 void Table::close() {
375         cloud->closeCloud();
376 }
377
378 IoTString *Table::getCommitted(IoTString *key)  {
379         KeyValue *kv = committedKeyValueTable->get(key);
380
381         if (kv != NULL) {
382                 return new IoTString(kv->getValue());
383         } else {
384                 return NULL;
385         }
386 }
387
388 IoTString *Table::getSpeculative(IoTString *key) {
389         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
390
391         if (kv == NULL) {
392                 kv = speculatedKeyValueTable->get(key);
393         }
394
395         if (kv == NULL) {
396                 kv = committedKeyValueTable->get(key);
397         }
398
399         if (kv != NULL) {
400                 return new IoTString(kv->getValue());
401         } else {
402                 return NULL;
403         }
404 }
405
406 IoTString *Table::getCommittedAtomic(IoTString *key) {
407         KeyValue *kv = committedKeyValueTable->get(key);
408
409         if (!arbitratorTable->contains(key)) {
410                 throw new Error("Key not Found.");
411         }
412
413         // Make sure new key value pair matches the current arbitrator
414         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
415                 // TODO: Maybe not throw en error
416                 throw new Error("Not all Key Values Match Arbitrator.");
417         }
418
419         if (kv != NULL) {
420                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
421                 return new IoTString(kv->getValue());
422         } else {
423                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
424                 return NULL;
425         }
426 }
427
428 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
429         if (!arbitratorTable->contains(key)) {
430                 throw new Error("Key not Found.");
431         }
432
433         // Make sure new key value pair matches the current arbitrator
434         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
435                 // TODO: Maybe not throw en error
436                 throw new Error("Not all Key Values Match Arbitrator.");
437         }
438
439         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
440
441         if (kv == NULL) {
442                 kv = speculatedKeyValueTable->get(key);
443         }
444
445         if (kv == NULL) {
446                 kv = committedKeyValueTable->get(key);
447         }
448
449         if (kv != NULL) {
450                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
451                 return new IoTString(kv->getValue());
452         } else {
453                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
454                 return NULL;
455         }
456 }
457
458 bool Table::update()  {
459         try {
460                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
461                 validateAndUpdate(newSlots, false);
462                 delete newSlots;
463                 sendToServer(NULL);
464                 updateLiveTransactionsAndStatus();
465                 return true;
466         } catch (Exception *e) {
467                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
468                 while (kit->hasNext()) {
469                         int64_t m = kit->next();
470                         updateFromLocal(m);
471                 }
472                 delete kit;
473         }
474
475         return false;
476 }
477
478 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
479         while (true) {
480                 if (arbitratorTable->contains(keyName)) {
481                         // There is already an arbitrator
482                         return false;
483                 }
484                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
485
486                 if (sendToServer(newKey)) {
487                         // If successfully inserted
488                         return true;
489                 }
490         }
491 }
492
493 void Table::startTransaction() {
494         // Create a new transaction, invalidates any old pending transactions.
495         if (pendingTransactionBuilder != NULL)
496                 delete pendingTransactionBuilder;
497         pendingTransactionBuilder = new PendingTransaction(localMachineId);
498 }
499
500 void Table::put(IoTString *key, IoTString *value) {
501         // Make sure it is a valid key
502         if (!arbitratorTable->contains(key)) {
503                 throw new Error("Key not Found.");
504         }
505
506         // Make sure new key value pair matches the current arbitrator
507         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
508                 // TODO: Maybe not throw en error
509                 throw new Error("Not all Key Values Match Arbitrator.");
510         }
511
512         // Add the key value to this transaction
513         KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
514         pendingTransactionBuilder->addKV(kv);
515 }
516
517 TransactionStatus *Table::commitTransaction() {
518         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
519                 // transaction with no updates will have no effect on the system
520                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
521         }
522
523         // Set the local transaction sequence number and increment
524         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
525         localTransactionSequenceNumber++;
526
527         // Create the transaction status
528         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
529
530         // Create the new transaction
531         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
532         newTransaction->setTransactionStatus(transactionStatus);
533
534         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
535                 // Add it to the queue and invalidate the builder for safety
536                 pendingTransactionQueue->add(newTransaction);
537         } else {
538                 arbitrateOnLocalTransaction(newTransaction);
539                 delete newTransaction;
540                 updateLiveStateFromLocal();
541         }
542         if (pendingTransactionBuilder != NULL)
543                 delete pendingTransactionBuilder;
544         
545         pendingTransactionBuilder = new PendingTransaction(localMachineId);
546
547         try {
548                 sendToServer(NULL);
549         } catch (ServerException *e) {
550
551                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
552                 uint size = pendingTransactionQueue->size();
553                 uint oldindex = 0;
554                 for (uint iter = 0; iter < size; iter++) {
555                         Transaction *transaction = pendingTransactionQueue->get(iter);
556                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
557
558                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
559                                 // Already contacted this client so ignore all attempts to contact this client
560                                 // to preserve ordering for arbitrator
561                                 continue;
562                         }
563
564                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
565
566                         if (sendReturn.getFirst()) {
567                                 // Failed to contact over local
568                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
569                         } else {
570                                 // Successful contact or should not contact
571
572                                 if (sendReturn.getSecond()) {
573                                         // did arbitrate
574                                         delete transaction;
575                                         oldindex--;
576                                 }
577                         }
578                 }
579                 pendingTransactionQueue->setSize(oldindex);
580         }
581
582         updateLiveStateFromLocal();
583
584         return transactionStatus;
585 }
586
587 /**
588  * Recalculate the new resize threshold
589  */
590 void Table::setResizeThreshold() {
591         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
592         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
593 }
594
595 int64_t Table::getLocalSequenceNumber() {
596         return localSequenceNumber;
597 }
598
599 void Table::processTransactionList(bool handlePartial) {
600         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
601         while (trit->hasNext()) {
602                 Transaction *transaction = trit->next();
603                 transaction->resetServerFailure();
604                 // Update which transactions parts still need to be sent
605                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
606                 // Add the transaction status to the outstanding list
607                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
608                 
609                 // Update the transaction status
610                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
611                 
612                 // Check if all the transaction parts were successfully
613                 // sent and if so then remove it from pending
614                 if (transaction->didSendAllParts()) {
615                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
616                         pendingTransactionQueue->remove(transaction);
617                         delete transaction;
618                 } else if (handlePartial) {
619                         transaction->resetServerFailure();
620                         // Set the transaction sequence number back to nothing
621                         if (!transaction->didSendAPartToServer()) {
622                                 transaction->setSequenceNumber(-1);
623                         }
624                 }
625         }
626         delete trit;
627 }
628
629 NewKey * Table::handlePartialSend(NewKey * newKey) {
630         //Didn't receive acknowledgement for last send
631         //See if the server has received a newer slot
632         
633         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
634         if (newSlots->length() == 0) {
635                 //Retry sending old slot
636                 bool wasInserted = false;
637                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
638                 
639                 if (sendSlotsReturn) {
640                         lastSlotAttemptedToSend = NULL;
641                         if (newKey != NULL) {
642                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
643                                         delete newKey;
644                                         newKey = NULL;
645                                 }
646                         }
647                         processTransactionList(false);
648                 } else {
649                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
650                                 if (newKey != NULL) {
651                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
652                                                 delete newKey;
653                                                 newKey = NULL;
654                                         }
655                                 }
656                                 processTransactionList(true);
657                         }
658                 }
659                 
660                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
661                 while (trit->hasNext()) {
662                         Transaction *transaction = trit->next();
663                         transaction->resetServerFailure();
664                         // Set the transaction sequence number back to nothing
665                         if (!transaction->didSendAPartToServer()) {
666                                 transaction->setSequenceNumber(-1);
667                         }
668                 }
669                 delete trit;
670                 
671                 if (newSlots->length() != 0) {
672                         // insert into the local block chain
673                         validateAndUpdate(newSlots, true);
674                 }
675         } else {
676                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
677                         if (newKey != NULL) {
678                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
679                                         delete newKey;
680                                         newKey = NULL;
681                                 }
682                         }
683
684                         processTransactionList(true);
685                 } else {
686                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
687                         while (trit->hasNext()) {
688                                 Transaction *transaction = trit->next();
689                                 transaction->resetServerFailure();
690                                 // Set the transaction sequence number back to nothing
691                                 if (!transaction->didSendAPartToServer()) {
692                                         transaction->setSequenceNumber(-1);
693                                 }
694                         }
695                         delete trit;
696                 }
697                 
698                 // insert into the local block chain
699                 validateAndUpdate(newSlots, true);
700         }
701         delete newSlots;
702         return newKey;
703 }
704
705 void Table::clearSentParts() {
706         // Clear the sent data since we are trying again
707         pendingSendArbitrationEntriesToDelete->clear();
708         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
709         while (trit->hasNext()) {
710                 Transaction *transaction = trit->next();
711                 delete trit->currVal();
712         }
713         delete trit;
714         transactionPartsSent->clear();
715 }
716
717 bool Table::sendToServer(NewKey *newKey) {
718         if (hadPartialSendToServer) {
719                 newKey = handlePartialSend(newKey);
720         }
721
722         try {
723                 // While we have stuff that needs inserting into the block chain
724                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
725                         if (hadPartialSendToServer) {
726                                 throw new Error("Should Be error free");
727                         }
728                         
729                         // If there is a new key with same name then end
730                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
731                                 delete newKey;
732                                 return false;
733                         }
734
735                         // Create the slot
736                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
737                         localSequenceNumber++;
738
739                         // Try to fill the slot with data
740                         int newSize = 0;
741                         bool insertedNewKey = false;
742                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
743
744                         if (needsResize) {
745                                 // Reset which transaction to send
746                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
747                                 while (trit->hasNext()) {
748                                         Transaction *transaction = trit->next();
749                                         transaction->resetNextPartToSend();
750
751                                         // Set the transaction sequence number back to nothing
752                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
753                                                 transaction->setSequenceNumber(-1);
754                                         }
755                                 }
756                                 delete trit;
757
758                                 // Clear the sent data since we are trying again
759                                 clearSentParts();
760                                         
761                                 // We needed a resize so try again
762                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
763                         }
764                         if (lastSlotAttemptedToSend != NULL)
765                                 delete lastSlotAttemptedToSend;
766                         
767                         lastSlotAttemptedToSend = slot;
768                         lastIsNewKey = (newKey != NULL);
769                         lastInsertedNewKey = insertedNewKey;
770                         lastNewSize = newSize;
771                         if (( newKey != lastNewKey) && (lastNewKey != NULL))
772                                 delete lastNewKey;
773                         lastNewKey = newKey;
774                         if (lastTransactionPartsSent != NULL)
775                                 delete lastTransactionPartsSent;
776                         lastTransactionPartsSent = transactionPartsSent->clone();
777
778                         Array<Slot *> * newSlots = NULL;
779                         bool wasInserted = false;
780                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
781
782                         if (sendSlotsReturn) {
783                                 lastSlotAttemptedToSend = NULL;
784                                 // Did insert into the block chain
785                                 if (insertedNewKey) {
786                                         // This slot was what was inserted not a previous slot
787                                         // New Key was successfully inserted into the block chain so dont want to insert it again
788                                         newKey = NULL;
789                                 }
790
791                                 // Remove the aborts and commit parts that were sent from the pending to send queue
792                                 uint size = pendingSendArbitrationRounds->size();
793                                 uint oldcount = 0;
794                                 for (uint i = 0; i < size; i++) {
795                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
796                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
797
798                                         if (!round->isDoneSending()) {
799                                                 //Add part back in
800                                                 pendingSendArbitrationRounds->set(oldcount++,
801                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
802                                         } else
803                                                 delete pendingSendArbitrationRounds->get(i);
804                                 }
805                                 pendingSendArbitrationRounds->setSize(oldcount);
806                                 processTransactionList(false);
807                         } else {
808                                 // Reset which transaction to send
809                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
810                                 while (trit->hasNext()) {
811                                         Transaction *transaction = trit->next();
812                                         transaction->resetNextPartToSend();
813
814                                         // Set the transaction sequence number back to nothing
815                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
816                                                 transaction->setSequenceNumber(-1);
817                                         }
818                                 }
819                                 delete trit;
820                         }
821
822                         // Clear the sent data in preparation for next send
823                         clearSentParts();
824
825                         if (newSlots->length() != 0) {
826                                 // insert into the local block chain
827                                 validateAndUpdate(newSlots, true);
828                         }
829                         delete newSlots;
830                 }
831         } catch (ServerException *e) {
832                 if (e->getType() != ServerException_TypeInputTimeout) {
833                         // Nothing was able to be sent to the server so just clear these data structures
834                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
835                         while (trit->hasNext()) {
836                                 Transaction *transaction = trit->next();
837                                 transaction->resetNextPartToSend();
838
839                                 // Set the transaction sequence number back to nothing
840                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
841                                         transaction->setSequenceNumber(-1);
842                                 }
843                         }
844                         delete trit;
845                 } else {
846                         // There was a partial send to the server
847                         hadPartialSendToServer = true;
848
849                         // Nothing was able to be sent to the server so just clear these data structures
850                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
851                         while (trit->hasNext()) {
852                                 Transaction *transaction = trit->next();
853                                 transaction->resetNextPartToSend();
854                                 transaction->setServerFailure();
855                         }
856                         delete trit;
857                 }
858
859                 clearSentParts();
860
861                 throw e;
862         }
863
864         return newKey == NULL;
865 }
866
867 bool Table::updateFromLocal(int64_t machineId) {
868         if (!localCommunicationTable->contains(machineId))
869                 return false;
870
871         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
872
873         // Get the size of the send data
874         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
875
876         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
877         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
878                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
879         }
880
881         Array<char> *sendData = new Array<char>(sendDataSize);
882         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
883
884         // Encode the data
885         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
886         bbEncode->putInt(0);
887
888         // Send by local
889         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
890         localSequenceNumber++;
891
892         if (returnData == NULL) {
893                 // Could not contact server
894                 return false;
895         }
896
897         // Decode the data
898         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
899         int numberOfEntries = bbDecode->getInt();
900
901         for (int i = 0; i < numberOfEntries; i++) {
902                 char type = bbDecode->get();
903                 if (type == TypeAbort) {
904                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
905                         processEntry(abort);
906                 } else if (type == TypeCommitPart) {
907                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
908                         processEntry(commitPart);
909                 }
910         }
911
912         updateLiveStateFromLocal();
913
914         return true;
915 }
916
917 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
918
919         // Get the devices local communications
920         if (!localCommunicationTable->contains(transaction->getArbitrator()))
921                 return Pair<bool, bool>(true, false);
922
923         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
924
925         // Get the size of the send data
926         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
927         {
928                 Vector<TransactionPart *> *tParts = transaction->getParts();
929                 uint tPartsSize = tParts->size();
930                 for (uint i = 0; i < tPartsSize; i++) {
931                         TransactionPart *part = tParts->get(i);
932                         sendDataSize += part->getSize();
933                 }
934         }
935
936         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
937         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
938                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
939         }
940
941         // Make the send data size
942         Array<char> *sendData = new Array<char>(sendDataSize);
943         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
944
945         // Encode the data
946         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
947         bbEncode->putInt(transaction->getParts()->size());
948         {
949                 Vector<TransactionPart *> *tParts = transaction->getParts();
950                 uint tPartsSize = tParts->size();
951                 for (uint i = 0; i < tPartsSize; i++) {
952                         TransactionPart *part = tParts->get(i);
953                         part->encode(bbEncode);
954                 }
955         }
956
957         // Send by local
958         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
959         localSequenceNumber++;
960
961         if (returnData == NULL) {
962                 // Could not contact server
963                 return Pair<bool, bool>(true, false);
964         }
965
966         // Decode the data
967         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
968         bool didCommit = bbDecode->get() == 1;
969         bool couldArbitrate = bbDecode->get() == 1;
970         int numberOfEntries = bbDecode->getInt();
971         bool foundAbort = false;
972
973         for (int i = 0; i < numberOfEntries; i++) {
974                 char type = bbDecode->get();
975                 if (type == TypeAbort) {
976                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
977
978                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
979                                 foundAbort = true;
980                         }
981
982                         processEntry(abort);
983                 } else if (type == TypeCommitPart) {
984                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
985                         processEntry(commitPart);
986                 }
987         }
988
989         updateLiveStateFromLocal();
990
991         if (couldArbitrate) {
992                 TransactionStatus *status =  transaction->getTransactionStatus();
993                 if (didCommit) {
994                         status->setStatus(TransactionStatus_StatusCommitted);
995                 } else {
996                         status->setStatus(TransactionStatus_StatusAborted);
997                 }
998         } else {
999                 TransactionStatus *status =  transaction->getTransactionStatus();
1000                 if (foundAbort) {
1001                         status->setStatus(TransactionStatus_StatusAborted);
1002                 } else {
1003                         status->setStatus(TransactionStatus_StatusCommitted);
1004                 }
1005         }
1006
1007         return Pair<bool, bool>(false, true);
1008 }
1009
1010 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1011         // Decode the data
1012         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1013         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1014         int numberOfParts = bbDecode->getInt();
1015
1016         // If we did commit a transaction or not
1017         bool didCommit = false;
1018         bool couldArbitrate = false;
1019
1020         if (numberOfParts != 0) {
1021
1022                 // decode the transaction
1023                 Transaction *transaction = new Transaction();
1024                 for (int i = 0; i < numberOfParts; i++) {
1025                         bbDecode->get();
1026                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1027                         transaction->addPartDecode(newPart);
1028                 }
1029
1030                 // Arbitrate on transaction and pull relevant return data
1031                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1032                 couldArbitrate = localArbitrateReturn.getFirst();
1033                 didCommit = localArbitrateReturn.getSecond();
1034
1035                 updateLiveStateFromLocal();
1036
1037                 // Transaction was sent to the server so keep track of it to prevent double commit
1038                 if (transaction->getSequenceNumber() != -1) {
1039                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1040                 }
1041         }
1042
1043         // The data to send back
1044         int returnDataSize = 0;
1045         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1046
1047         // Get the aborts to send back
1048         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1049         {
1050                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1051                 while (abortit->hasNext())
1052                         abortLocalSequenceNumbers->add(abortit->next());
1053                 delete abortit;
1054         }
1055
1056         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1057
1058         uint asize = abortLocalSequenceNumbers->size();
1059         for (uint i = 0; i < asize; i++) {
1060                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1061                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1062                         continue;
1063                 }
1064
1065                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1066                 unseenArbitrations->add(abort);
1067                 returnDataSize += abort->getSize();
1068         }
1069
1070         // Get the commits to send back
1071         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1072         if (commitForClientTable != NULL) {
1073                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1074                 {
1075                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1076                         while (commitit->hasNext())
1077                                 commitLocalSequenceNumbers->add(commitit->next());
1078                         delete commitit;
1079                 }
1080                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1081
1082                 uint clsSize = commitLocalSequenceNumbers->size();
1083                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1084                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1085                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1086
1087                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1088                                 continue;
1089                         }
1090
1091                         {
1092                                 Vector<CommitPart *> *parts = commit->getParts();
1093                                 uint nParts = parts->size();
1094                                 for (uint i = 0; i < nParts; i++) {
1095                                         CommitPart *commitPart = parts->get(i);
1096                                         unseenArbitrations->add(commitPart);
1097                                         returnDataSize += commitPart->getSize();
1098                                 }
1099                         }
1100                 }
1101         }
1102
1103         // Number of arbitration entries to decode
1104         returnDataSize += 2 * sizeof(int32_t);
1105
1106         // bool of did commit or not
1107         if (numberOfParts != 0) {
1108                 returnDataSize += sizeof(char);
1109         }
1110
1111         // Data to send Back
1112         Array<char> *returnData = new Array<char>(returnDataSize);
1113         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1114
1115         if (numberOfParts != 0) {
1116                 if (didCommit) {
1117                         bbEncode->put((char)1);
1118                 } else {
1119                         bbEncode->put((char)0);
1120                 }
1121                 if (couldArbitrate) {
1122                         bbEncode->put((char)1);
1123                 } else {
1124                         bbEncode->put((char)0);
1125                 }
1126         }
1127
1128         bbEncode->putInt(unseenArbitrations->size());
1129         uint size = unseenArbitrations->size();
1130         for (uint i = 0; i < size; i++) {
1131                 Entry *entry = unseenArbitrations->get(i);
1132                 entry->encode(bbEncode);
1133         }
1134
1135         localSequenceNumber++;
1136         return returnData;
1137 }
1138
1139 /** Checks whether a given slot was sent using new slots in
1140                 array. Returns true if sent and false otherwise.  */
1141
1142 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1143         uint size = array->length();
1144         for (uint i = 0; i < size; i++) {
1145                 Slot *s = array->get(i);
1146                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1147                         return true;
1148                 }
1149         }
1150         
1151         //Also need to see if other machines acknowledged our message
1152         for (uint i = 0; i < size; i++) {
1153                 Slot *s = array->get(i);
1154                 
1155                 // Process each entry in the slot
1156                 Vector<Entry *> *entries = s->getEntries();
1157                 uint eSize = entries->size();
1158                 for (uint ei = 0; ei < eSize; ei++) {
1159                         Entry *entry = entries->get(ei);
1160                         
1161                         if (entry->getType() == TypeLastMessage) {
1162                                 LastMessage *lastMessage = (LastMessage *)entry;
1163                                 
1164                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1165                                         return true;
1166                                 }
1167                         }
1168                 }
1169         }
1170         //Not found
1171         return false;
1172 }
1173
1174 /** Method tries to send slot to server.  Returns status in tuple.
1175                 isInserted returns whether last un-acked send (if any) was
1176                 successful.  Returns whether send was confirmed.x
1177  */
1178
1179 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1180         attemptedToSendToServer = true;
1181
1182         *array = cloud->putSlot(slot, newSize);
1183         if (*array == NULL) {
1184                 *array = new Array<Slot *>(1);
1185                 (*array)->set(0, slot);
1186                 rejectedSlotVector->clear();
1187                 *isInserted = false;
1188                 return true;
1189         } else {
1190                 if ((*array)->length() == 0) {
1191                         throw new Error("Server Error: Did not send any slots");
1192                 }
1193
1194                 if (hadPartialSendToServer) {
1195                         *isInserted = checkSend(*array, slot);
1196
1197                         if (!(*isInserted)) {
1198                                 rejectedSlotVector->add(slot->getSequenceNumber());
1199                         }
1200                         
1201                         return false;
1202                 } else {
1203                         rejectedSlotVector->add(slot->getSequenceNumber());
1204                         *isInserted = false;
1205                         return false;
1206                 }
1207         }
1208 }
1209
1210 /**
1211  * Returns true if a resize was needed but not done.
1212  */
1213 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1214         newSize = 0;//special value to indicate no resize
1215         if (liveSlotCount > bufferResizeThreshold) {
1216                 resize = true;//Resize is forced
1217         }
1218
1219         if (resize) {
1220                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1221                 TableStatus *status = new TableStatus(slot, newSize);
1222                 slot->addShallowEntry(status);
1223         }
1224
1225         // Fill with rejected slots first before doing anything else
1226         doRejectedMessages(slot);
1227
1228         // Do mandatory rescue of entries
1229         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1230
1231         // Extract working variables
1232         bool needsResize = mandatoryRescueReturn.getFirst();
1233         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1234         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1235
1236         if (needsResize && !resize) {
1237                 // We need to resize but we are not resizing so return true to force on retry
1238                 return true;
1239         }
1240
1241         insertedKey = false;
1242         if (newKeyEntry != NULL) {
1243                 newKeyEntry->setSlot(slot);
1244                 if (slot->hasSpace(newKeyEntry)) {
1245                         slot->addEntry(newKeyEntry);
1246                         insertedKey = true;
1247                 }
1248         }
1249
1250         // Clear the transactions, aborts and commits that were sent previously
1251         clearSentParts();
1252         uint size = pendingSendArbitrationRounds->size();
1253         for (uint i = 0; i < size; i++) {
1254                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1255                 bool isFull = false;
1256                 round->generateParts();
1257                 Vector<Entry *> *parts = round->getParts();
1258
1259                 // Insert pending arbitration data
1260                 uint vsize = parts->size();
1261                 for (uint vi = 0; vi < vsize; vi++) {
1262                         Entry *arbitrationData = parts->get(vi);
1263
1264                         // If it is an abort then we need to set some information
1265                         if (arbitrationData->getType() == TypeAbort) {
1266                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1267                         }
1268
1269                         if (!slot->hasSpace(arbitrationData)) {
1270                                 // No space so cant do anything else with these data entries
1271                                 isFull = true;
1272                                 break;
1273                         }
1274
1275                         // Add to this current slot and add it to entries to delete
1276                         slot->addEntry(arbitrationData);
1277                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1278                 }
1279
1280                 if (isFull) {
1281                         break;
1282                 }
1283         }
1284
1285         if (pendingTransactionQueue->size() > 0) {
1286                 Transaction *transaction = pendingTransactionQueue->get(0);
1287                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1288                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1289                         transaction->setSequenceNumber(slot->getSequenceNumber());
1290                 }
1291
1292                 while (true) {
1293                         TransactionPart *part = transaction->getNextPartToSend();
1294                         if (part == NULL) {
1295                                 // Ran out of parts to send for this transaction so move on
1296                                 break;
1297                         }
1298
1299                         if (slot->hasSpace(part)) {
1300                                 slot->addEntry(part);
1301                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1302                                 if (partsSent == NULL) {
1303                                         partsSent = new Vector<int32_t>();
1304                                         transactionPartsSent->put(transaction, partsSent);
1305                                 }
1306                                 partsSent->add(part->getPartNumber());
1307                                 transactionPartsSent->put(transaction, partsSent);
1308                         } else {
1309                                 break;
1310                         }
1311                 }
1312         }
1313
1314         // Fill the remainder of the slot with rescue data
1315         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1316
1317         return false;
1318 }
1319
1320 void Table::doRejectedMessages(Slot *s) {
1321         if (!rejectedSlotVector->isEmpty()) {
1322                 /* TODO: We should avoid generating a rejected message entry if
1323                  * there is already a sufficient entry in the queue (e->g->,
1324                  * equalsto value of true and same sequence number)->  */
1325
1326                 int64_t old_seqn = rejectedSlotVector->get(0);
1327                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1328                         int64_t new_seqn = rejectedSlotVector->lastElement();
1329                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1330                         s->addShallowEntry(rm);
1331                 } else {
1332                         int64_t prev_seqn = -1;
1333                         uint i = 0;
1334                         /* Go through list of missing messages */
1335                         for (; i < rejectedSlotVector->size(); i++) {
1336                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1337                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1338                                 if (s_msg != NULL)
1339                                         break;
1340                                 prev_seqn = curr_seqn;
1341                         }
1342                         /* Generate rejected message entry for missing messages */
1343                         if (prev_seqn != -1) {
1344                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1345                                 s->addShallowEntry(rm);
1346                         }
1347                         /* Generate rejected message entries for present messages */
1348                         for (; i < rejectedSlotVector->size(); i++) {
1349                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1350                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1351                                 int64_t machineid = s_msg->getMachineID();
1352                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1353                                 s->addShallowEntry(rm);
1354                         }
1355                 }
1356         }
1357 }
1358
1359 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1360         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1361         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1362         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1363                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1364         }
1365
1366         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1367         bool seenLiveSlot = false;
1368         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1369         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1370
1371
1372         // Mandatory Rescue
1373         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1374                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1375                 // Push slot number forward
1376                 if (!seenLiveSlot) {
1377                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1378                 }
1379
1380                 if (!previousSlot->isLive()) {
1381                         continue;
1382                 }
1383
1384                 // We have seen a live slot
1385                 seenLiveSlot = true;
1386
1387                 // Get all the live entries for a slot
1388                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1389
1390                 // Iterate over all the live entries and try to rescue them
1391                 uint lESize = liveEntries->size();
1392                 for (uint i = 0; i < lESize; i++) {
1393                         Entry *liveEntry = liveEntries->get(i);
1394                         if (slot->hasSpace(liveEntry)) {
1395                                 // Enough space to rescue the entry
1396                                 slot->addEntry(liveEntry);
1397                         } else if (currentSequenceNumber == firstIfFull) {
1398                                 //if there's no space but the entry is about to fall off the queue
1399                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1400                         }
1401                 }
1402         }
1403
1404         // Did not resize
1405         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1406 }
1407
1408 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1409         /* now go through live entries from least to greatest sequence number until
1410          * either all live slots added, or the slot doesn't have enough room
1411          * for SKIP_THRESHOLD consecutive entries*/
1412         int skipcount = 0;
1413         int64_t newestseqnum = buffer->getNewestSeqNum();
1414         for (; seqn <= newestseqnum; seqn++) {
1415                 Slot *prevslot = buffer->getSlot(seqn);
1416                 //Push slot number forward
1417                 if (!seenliveslot)
1418                         oldestLiveSlotSequenceNumver = seqn;
1419
1420                 if (!prevslot->isLive())
1421                         continue;
1422                 seenliveslot = true;
1423                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1424                 uint lESize = liveentries->size();
1425                 for (uint i = 0; i < lESize; i++) {
1426                         Entry *liveentry = liveentries->get(i);
1427                         if (s->hasSpace(liveentry))
1428                                 s->addEntry(liveentry);
1429                         else {
1430                                 skipcount++;
1431                                 if (skipcount > Table_SKIP_THRESHOLD) {
1432                                         delete liveentries;
1433                                         goto donesearch;
1434                                 }
1435                         }
1436                 }
1437                 delete liveentries;
1438         }
1439 donesearch:
1440         ;
1441 }
1442
1443 /**
1444  * Checks for malicious activity and updates the local copy of the block chain->
1445  */
1446 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1447         // The cloud communication layer has checked slot HMACs already
1448         // before decoding
1449         if (newSlots->length() == 0) {
1450                 return;
1451         }
1452
1453         // Make sure all slots are newer than the last largest slot this
1454         // client has seen
1455         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1456         if (firstSeqNum <= sequenceNumber) {
1457                 throw new Error("Server Error: Sent older slots!");
1458         }
1459
1460         // Create an object that can access both new slots and slots in our
1461         // local chain without committing slots to our local chain
1462         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1463
1464         // Check that the HMAC chain is not broken
1465         checkHMACChain(indexer, newSlots);
1466
1467         // Set to keep track of messages from clients
1468         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1469         {
1470                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1471                 while (lmit->hasNext())
1472                         machineSet->add(lmit->next());
1473                 delete lmit;
1474         }
1475
1476         // Process each slots data
1477         {
1478                 uint numSlots = newSlots->length();
1479                 for (uint i = 0; i < numSlots; i++) {
1480                         Slot *slot = newSlots->get(i);
1481                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1482                         updateExpectedSize();
1483                 }
1484         }
1485         delete indexer;
1486         
1487         // If there is a gap, check to see if the server sent us
1488         // everything->
1489         if (firstSeqNum != (sequenceNumber + 1)) {
1490
1491                 // Check the size of the slots that were sent down by the server->
1492                 // Can only check the size if there was a gap
1493                 checkNumSlots(newSlots->length());
1494
1495                 // Since there was a gap every machine must have pushed a slot or
1496                 // must have a last message message-> If not then the server is
1497                 // hiding slots
1498                 if (!machineSet->isEmpty()) {
1499                         delete machineSet;
1500                         throw new Error("Missing record for machines: ");
1501                 }
1502         }
1503         delete machineSet;
1504         // Update the size of our local block chain->
1505         commitNewMaxSize();
1506
1507         // Commit new to slots to the local block chain->
1508         {
1509                 uint numSlots = newSlots->length();
1510                 for (uint i = 0; i < numSlots; i++) {
1511                         Slot *slot = newSlots->get(i);
1512
1513                         // Insert this slot into our local block chain copy->
1514                         buffer->putSlot(slot);
1515
1516                         // Keep track of how many slots are currently live (have live data
1517                         // in them)->
1518                         liveSlotCount++;
1519                 }
1520         }
1521         // Get the sequence number of the latest slot in the system
1522         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1523         updateLiveStateFromServer();
1524
1525         // No Need to remember after we pulled from the server
1526         offlineTransactionsCommittedAndAtServer->clear();
1527
1528         // This is invalidated now
1529         hadPartialSendToServer = false;
1530 }
1531
1532 void Table::updateLiveStateFromServer() {
1533         // Process the new transaction parts
1534         processNewTransactionParts();
1535
1536         // Do arbitration on new transactions that were received
1537         arbitrateFromServer();
1538
1539         // Update all the committed keys
1540         bool didCommitOrSpeculate = updateCommittedTable();
1541
1542         // Delete the transactions that are now dead
1543         updateLiveTransactionsAndStatus();
1544
1545         // Do speculations
1546         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1547         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1548 }
1549
1550 void Table::updateLiveStateFromLocal() {
1551         // Update all the committed keys
1552         bool didCommitOrSpeculate = updateCommittedTable();
1553
1554         // Delete the transactions that are now dead
1555         updateLiveTransactionsAndStatus();
1556
1557         // Do speculations
1558         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1559         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1560 }
1561
1562 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1563         int64_t prevslots = firstSequenceNumber;
1564
1565         if (didFindTableStatus) {
1566         } else {
1567                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1568         }
1569
1570         didFindTableStatus = true;
1571         currMaxSize = numberOfSlots;
1572 }
1573
1574 void Table::updateExpectedSize() {
1575         expectedsize++;
1576
1577         if (expectedsize > currMaxSize) {
1578                 expectedsize = currMaxSize;
1579         }
1580 }
1581
1582
1583 /**
1584  * Check the size of the block chain to make sure there are enough
1585  * slots sent back by the server-> This is only called when we have a
1586  * gap between the slots that we have locally and the slots sent by
1587  * the server therefore in the slots sent by the server there will be
1588  * at least 1 Table status message
1589  */
1590 void Table::checkNumSlots(int numberOfSlots) {
1591         if (numberOfSlots != expectedsize) {
1592                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1593         }
1594 }
1595
1596 /**
1597  * Update the size of of the local buffer if it is needed->
1598  */
1599 void Table::commitNewMaxSize() {
1600         didFindTableStatus = false;
1601
1602         // Resize the local slot buffer
1603         if (numberOfSlots != currMaxSize) {
1604                 buffer->resize((int32_t)currMaxSize);
1605         }
1606
1607         // Change the number of local slots to the new size
1608         numberOfSlots = (int32_t)currMaxSize;
1609
1610         // Recalculate the resize threshold since the size of the local
1611         // buffer has changed
1612         setResizeThreshold();
1613 }
1614
1615 /**
1616  * Process the new transaction parts from this latest round of slots
1617  * received from the server
1618  */
1619 void Table::processNewTransactionParts() {
1620
1621         if (newTransactionParts->size() == 0) {
1622                 // Nothing new to process
1623                 return;
1624         }
1625
1626         // Iterate through all the machine Ids that we received new parts
1627         // for
1628         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1629         while (tpit->hasNext()) {
1630                 int64_t machineId = tpit->next();
1631                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1632
1633                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1634                 // Iterate through all the parts for that machine Id
1635                 while (ptit->hasNext()) {
1636                         Pair<int64_t, int32_t> *partId = ptit->next();
1637                         TransactionPart *part = parts->get(partId);
1638
1639                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1640                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1641                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1642                                         // Set dead the transaction part
1643                                         part->setDead();
1644                                         continue;
1645                                 }
1646                         }
1647
1648                         // Get the transaction object for that sequence number
1649                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1650
1651                         if (transaction == NULL) {
1652                                 // This is a new transaction that we dont have so make a new one
1653                                 transaction = new Transaction();
1654                                 
1655                                 // Add that part to the transaction
1656                                 transaction->addPartDecode(part);
1657
1658                                 // Insert this new transaction into the live tables
1659                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1660                                 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1661                         }
1662                 }
1663                 delete ptit;
1664         }
1665         delete tpit;
1666         // Clear all the new transaction parts in preparation for the next
1667         // time the server sends slots
1668         {
1669                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1670                 while (partsit->hasNext()) {
1671                         int64_t machineId = partsit->next();
1672                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1673                         delete parts;
1674                 }
1675                 delete partsit;
1676                 newTransactionParts->clear();
1677         }
1678 }
1679
1680 void Table::arbitrateFromServer() {
1681         if (liveTransactionBySequenceNumberTable->size() == 0) {
1682                 // Nothing to arbitrate on so move on
1683                 return;
1684         }
1685
1686         // Get the transaction sequence numbers and sort from oldest to newest
1687         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1688         {
1689                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1690                 while (trit->hasNext())
1691                         transactionSequenceNumbers->add(trit->next());
1692                 delete trit;
1693         }
1694         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1695
1696         // Collection of key value pairs that are
1697         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1698
1699         // The last transaction arbitrated on
1700         int64_t lastTransactionCommitted = -1;
1701         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1702         uint tsnSize = transactionSequenceNumbers->size();
1703         for (uint i = 0; i < tsnSize; i++) {
1704                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1705                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1706
1707                 // Check if this machine arbitrates for this transaction if not
1708                 // then we cant arbitrate this transaction
1709                 if (transaction->getArbitrator() != localMachineId) {
1710                         continue;
1711                 }
1712
1713                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1714                         continue;
1715                 }
1716
1717                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1718                         // We have seen this already locally so dont commit again
1719                         continue;
1720                 }
1721
1722                 if (!transaction->isComplete()) {
1723                         // Will arbitrate in incorrect order if we continue so just break
1724                         // Most likely this
1725                         break;
1726                 }
1727
1728                 // update the largest transaction seen by arbitrator from server
1729                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1730                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1731                 } else {
1732                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1733                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1734                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1735                         }
1736                 }
1737
1738                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1739                         // Guard evaluated as true
1740                         // Update the local changes so we can make the commit
1741                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1742                         while (kvit->hasNext()) {
1743                                 KeyValue *kv = kvit->next();
1744                                 speculativeTableTmp->put(kv->getKey(), kv);
1745                         }
1746                         delete kvit;
1747
1748                         // Update what the last transaction committed was for use in batch commit
1749                         lastTransactionCommitted = transactionSequenceNumber;
1750                 } else {
1751                         // Guard evaluated was false so create abort
1752                         // Create the abort
1753                         Abort *newAbort = new Abort(NULL,
1754                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1755                                                                                                                                         transaction->getSequenceNumber(),
1756                                                                                                                                         transaction->getMachineId(),
1757                                                                                                                                         transaction->getArbitrator(),
1758                                                                                                                                         localArbitrationSequenceNumber);
1759                         localArbitrationSequenceNumber++;
1760                         generatedAborts->add(newAbort);
1761
1762                         // Insert the abort so we can process
1763                         processEntry(newAbort);
1764                 }
1765
1766                 lastSeqNumArbOn = transactionSequenceNumber;
1767         }
1768
1769         delete transactionSequenceNumbers;
1770
1771         Commit *newCommit = NULL;
1772
1773         // If there is something to commit
1774         if (speculativeTableTmp->size() != 0) {
1775                 // Create the commit and increment the commit sequence number
1776                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1777                 localArbitrationSequenceNumber++;
1778
1779                 // Add all the new keys to the commit
1780                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1781                 while (spit->hasNext()) {
1782                         IoTString *string = spit->next();
1783                         KeyValue *kv = speculativeTableTmp->get(string);
1784                         newCommit->addKV(kv);
1785                 }
1786                 delete spit;
1787                 
1788                 // create the commit parts
1789                 newCommit->createCommitParts();
1790
1791                 // Append all the commit parts to the end of the pending queue
1792                 // waiting for sending to the server
1793                 // Insert the commit so we can process it
1794                 Vector<CommitPart *> *parts = newCommit->getParts();
1795                 uint partsSize = parts->size();
1796                 for (uint i = 0; i < partsSize; i++) {
1797                         CommitPart *commitPart = parts->get(i);
1798                         processEntry(commitPart);
1799                 }
1800         }
1801         delete speculativeTableTmp;
1802
1803         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1804                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1805                 pendingSendArbitrationRounds->add(arbitrationRound);
1806
1807                 if (compactArbitrationData()) {
1808                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1809                         if (newArbitrationRound->getCommit() != NULL) {
1810                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1811                                 uint partsSize = parts->size();
1812                                 for (uint i = 0; i < partsSize; i++) {
1813                                         CommitPart *commitPart = parts->get(i);
1814                                         processEntry(commitPart);
1815                                 }
1816                         }
1817                 }
1818         } else {
1819                 delete generatedAborts;
1820         }
1821 }
1822
1823 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1824
1825         // Check if this machine arbitrates for this transaction if not then
1826         // we cant arbitrate this transaction
1827         if (transaction->getArbitrator() != localMachineId) {
1828                 return Pair<bool, bool>(false, false);
1829         }
1830
1831         if (!transaction->isComplete()) {
1832                 // Will arbitrate in incorrect order if we continue so just break
1833                 // Most likely this
1834                 return Pair<bool, bool>(false, false);
1835         }
1836
1837         if (transaction->getMachineId() != localMachineId) {
1838                 // dont do this check for local transactions
1839                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1840                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1841                                 // We've have already seen this from the server
1842                                 return Pair<bool, bool>(false, false);
1843                         }
1844                 }
1845         }
1846
1847         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1848                 // Guard evaluated as true Create the commit and increment the
1849                 // commit sequence number
1850                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1851                 localArbitrationSequenceNumber++;
1852
1853                 // Update the local changes so we can make the commit
1854                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1855                 while (kvit->hasNext()) {
1856                         KeyValue *kv = kvit->next();
1857                         newCommit->addKV(kv->getCopy());
1858                 }
1859                 delete kvit;
1860
1861                 // create the commit parts
1862                 newCommit->createCommitParts();
1863
1864                 // Append all the commit parts to the end of the pending queue
1865                 // waiting for sending to the server
1866                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1867                 pendingSendArbitrationRounds->add(arbitrationRound);
1868
1869                 if (compactArbitrationData()) {
1870                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1871                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1872                         uint partsSize = parts->size();
1873                         for (uint i = 0; i < partsSize; i++) {
1874                                 CommitPart *commitPart = parts->get(i);
1875                                 processEntry(commitPart);
1876                         }
1877                 } else {
1878                         // Insert the commit so we can process it
1879                         Vector<CommitPart *> *parts = newCommit->getParts();
1880                         uint partsSize = parts->size();
1881                         for (uint i = 0; i < partsSize; i++) {
1882                                 CommitPart *commitPart = parts->get(i);
1883                                 processEntry(commitPart);
1884                         }
1885                 }
1886
1887                 if (transaction->getMachineId() == localMachineId) {
1888                         TransactionStatus *status = transaction->getTransactionStatus();
1889                         if (status != NULL) {
1890                                 status->setStatus(TransactionStatus_StatusCommitted);
1891                         }
1892                 }
1893
1894                 updateLiveStateFromLocal();
1895                 return Pair<bool, bool>(true, true);
1896         } else {
1897                 if (transaction->getMachineId() == localMachineId) {
1898                         // For locally created messages update the status
1899                         // Guard evaluated was false so create abort
1900                         TransactionStatus *status = transaction->getTransactionStatus();
1901                         if (status != NULL) {
1902                                 status->setStatus(TransactionStatus_StatusAborted);
1903                         }
1904                 } else {
1905                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1906
1907                         // Create the abort
1908                         Abort *newAbort = new Abort(NULL,
1909                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1910                                                                                                                                         -1,
1911                                                                                                                                         transaction->getMachineId(),
1912                                                                                                                                         transaction->getArbitrator(),
1913                                                                                                                                         localArbitrationSequenceNumber);
1914                         localArbitrationSequenceNumber++;
1915                         addAbortSet->add(newAbort);
1916
1917                         // Append all the commit parts to the end of the pending queue
1918                         // waiting for sending to the server
1919                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1920                         pendingSendArbitrationRounds->add(arbitrationRound);
1921
1922                         if (compactArbitrationData()) {
1923                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1924
1925                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1926                                 uint partsSize = parts->size();
1927                                 for (uint i = 0; i < partsSize; i++) {
1928                                         CommitPart *commitPart = parts->get(i);
1929                                         processEntry(commitPart);
1930                                 }
1931                         }
1932                 }
1933
1934                 updateLiveStateFromLocal();
1935                 return Pair<bool, bool>(true, false);
1936         }
1937 }
1938
1939 /**
1940  * Compacts the arbitration data by merging commits and aggregating
1941  * aborts so that a single large push of commits can be done instead
1942  * of many small updates
1943  */
1944 bool Table::compactArbitrationData() {
1945         if (pendingSendArbitrationRounds->size() < 2) {
1946                 // Nothing to compact so do nothing
1947                 return false;
1948         }
1949
1950         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1951         if (lastRound->getDidSendPart()) {
1952                 return false;
1953         }
1954
1955         bool hadCommit = (lastRound->getCommit() == NULL);
1956         bool gotNewCommit = false;
1957
1958         uint numberToDelete = 1;
1959         
1960         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1961                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1962
1963                 if (round->isFull() || round->getDidSendPart()) {
1964                         // Stop since there is a part that cannot be compacted and we
1965                         // need to compact in order
1966                         break;
1967                 }
1968
1969                 if (round->getCommit() == NULL) {
1970                         // Try compacting aborts only
1971                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1972                         if (newSize > ArbitrationRound_MAX_PARTS) {
1973                                 // Cant compact since it would be too large
1974                                 break;
1975                         }
1976                         lastRound->addAborts(round->getAborts());
1977                 } else {
1978                         // Create a new larger commit
1979                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1980                         localArbitrationSequenceNumber++;
1981
1982                         // Create the commit parts so that we can count them
1983                         newCommit->createCommitParts();
1984
1985                         // Calculate the new size of the parts
1986                         int newSize = newCommit->getNumberOfParts();
1987                         newSize += lastRound->getAbortsCount();
1988                         newSize += round->getAbortsCount();
1989
1990                         if (newSize > ArbitrationRound_MAX_PARTS) {
1991                                 // Can't compact since it would be too large
1992                                 if (lastRound->getCommit() != newCommit &&
1993                                                 round->getCommit() != newCommit)
1994                                         delete newCommit;
1995                                 break;
1996                         }
1997                         // Set the new compacted part
1998                         if (lastRound->getCommit() == newCommit)
1999                                 lastRound->setCommit(NULL);
2000                         if (round->getCommit() == newCommit)
2001                                 round->setCommit(NULL);
2002                         
2003                         if (lastRound->getCommit() != NULL) {
2004                                 Commit * oldcommit = lastRound->getCommit();
2005                                 lastRound->setCommit(NULL);
2006                                 delete oldcommit;
2007                         }
2008                         lastRound->setCommit(newCommit);
2009                         lastRound->addAborts(round->getAborts());
2010                         gotNewCommit = true;
2011                 }
2012
2013                 numberToDelete++;
2014         }
2015
2016         if (numberToDelete != 1) {
2017                 // If there is a compaction
2018                 // Delete the previous pieces that are now in the new compacted piece
2019                 for (uint i = 2; i <= numberToDelete; i++) {
2020                         delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2021                 }
2022                 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2023
2024                 pendingSendArbitrationRounds->add(lastRound);
2025
2026                 // Should reinsert into the commit processor
2027                 if (hadCommit && gotNewCommit) {
2028                         return true;
2029                 }
2030         }
2031
2032         return false;
2033 }
2034
2035 /**
2036  * Update all the commits and the committed tables, sets dead the dead
2037  * transactions
2038  */
2039 bool Table::updateCommittedTable() {
2040         if (newCommitParts->size() == 0) {
2041                 // Nothing new to process
2042                 return false;
2043         }
2044
2045         // Iterate through all the machine Ids that we received new parts for
2046         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2047         while (partsit->hasNext()) {
2048                 int64_t machineId = partsit->next();
2049                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2050
2051                 // Iterate through all the parts for that machine Id
2052                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2053                 while (pairit->hasNext()) {
2054                         Pair<int64_t, int32_t> *partId = pairit->next();
2055                         CommitPart *part = parts->get(partId);
2056
2057                         // Get the transaction object for that sequence number
2058                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2059
2060                         if (commitForClientTable == NULL) {
2061                                 // This is the first commit from this device
2062                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2063                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2064                         }
2065
2066                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2067
2068                         if (commit == NULL) {
2069                                 // This is a new commit that we dont have so make a new one
2070                                 commit = new Commit();
2071
2072                                 // Insert this new commit into the live tables
2073                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2074                         }
2075
2076                         // Add that part to the commit
2077                         commit->addPartDecode(part);
2078                 }
2079                 delete pairit;
2080                 delete parts;
2081         }
2082         delete partsit;
2083
2084         // Clear all the new commits parts in preparation for the next time
2085         // the server sends slots
2086         newCommitParts->clear();
2087
2088         // If we process a new commit keep track of it for future use
2089         bool didProcessANewCommit = false;
2090
2091         // Process the commits one by one
2092         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2093         while (liveit->hasNext()) {
2094                 int64_t arbitratorId = liveit->next();
2095                 // Get all the commits for a specific arbitrator
2096                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2097
2098                 // Sort the commits in order
2099                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2100                 {
2101                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2102                         while (clientit->hasNext())
2103                                 commitSequenceNumbers->add(clientit->next());
2104                         delete clientit;
2105                 }
2106
2107                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2108
2109                 // Get the last commit seen from this arbitrator
2110                 int64_t lastCommitSeenSequenceNumber = -1;
2111                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2112                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2113                 }
2114
2115                 // Go through each new commit one by one
2116                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2117                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2118                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2119                         // Special processing if a commit is not complete
2120                         if (!commit->isComplete()) {
2121                                 if (i == (commitSequenceNumbers->size() - 1)) {
2122                                         // If there is an incomplete commit and this commit is the
2123                                         // latest one seen then this commit cannot be processed and
2124                                         // there are no other commits
2125                                         break;
2126                                 } else {
2127                                         // This is a commit that was already dead but parts of it
2128                                         // are still in the block chain (not flushed out yet)->
2129                                         // Delete it and move on
2130                                         commit->setDead();
2131                                         commitForClientTable->remove(commit->getSequenceNumber());
2132                                         delete commit;
2133                                         continue;
2134                                 }
2135                         }
2136
2137                         // Update the last transaction that was updated if we can
2138                         if (commit->getTransactionSequenceNumber() != -1) {
2139                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2140                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2141                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2142                                 }
2143                         }
2144
2145                         // Update the last arbitration data that we have seen so far
2146                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2147                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2148                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2149                                         // Is larger
2150                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2151                                 }
2152                         } else {
2153                                 // Never seen any data from this arbitrator so record the first one
2154                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2155                         }
2156
2157                         // We have already seen this commit before so need to do the
2158                         // full processing on this commit
2159                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2160                                 // Update the last transaction that was updated if we can
2161                                 if (commit->getTransactionSequenceNumber() != -1) {
2162                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2163                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2164                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2165                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2166                                         }
2167                                 }
2168                                 continue;
2169                         }
2170
2171                         // If we got here then this is a brand new commit and needs full
2172                         // processing
2173                         // Get what commits should be edited, these are the commits that
2174                         // have live values for their keys
2175                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2176                         {
2177                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2178                                 while (kvit->hasNext()) {
2179                                         KeyValue *kv = kvit->next();
2180                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2181                                         if (commit != NULL)
2182                                                 commitsToEdit->add(commit);
2183                                 }
2184                                 delete kvit;
2185                         }
2186
2187                         // Update each previous commit that needs to be updated
2188                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2189                         while (commitit->hasNext()) {
2190                                 Commit *previousCommit = commitit->next();
2191
2192                                 // Only bother with live commits (TODO: Maybe remove this check)
2193                                 if (previousCommit->isLive()) {
2194
2195                                         // Update which keys in the old commits are still live
2196                                         {
2197                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2198                                                 while (kvit->hasNext()) {
2199                                                         KeyValue *kv = kvit->next();
2200                                                         previousCommit->invalidateKey(kv->getKey());
2201                                                 }
2202                                                 delete kvit;
2203                                         }
2204
2205                                         // if the commit is now dead then remove it
2206                                         if (!previousCommit->isLive()) {
2207                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2208                                                 delete previousCommit;
2209                                         }
2210                                 }
2211                         }
2212                         delete commitit;
2213                         delete commitsToEdit;
2214
2215                         // Update the last seen sequence number from this arbitrator
2216                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2217                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2218                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2219                                 }
2220                         } else {
2221                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2222                         }
2223
2224                         // We processed a new commit that we havent seen before
2225                         didProcessANewCommit = true;
2226
2227                         // Update the committed table of keys and which commit is using which key
2228                         {
2229                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2230                                 while (kvit->hasNext()) {
2231                                         KeyValue *kv = kvit->next();
2232                                         committedKeyValueTable->put(kv->getKey(), kv);
2233                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2234                                 }
2235                                 delete kvit;
2236                         }
2237                 }
2238                 delete commitSequenceNumbers;
2239         }
2240         delete liveit;
2241
2242         return didProcessANewCommit;
2243 }
2244
2245 /**
2246  * Create the speculative table from transactions that are still live
2247  * and have come from the cloud
2248  */
2249 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2250         if (liveTransactionBySequenceNumberTable->size() == 0) {
2251                 // There is nothing to speculate on
2252                 return false;
2253         }
2254
2255         // Create a list of the transaction sequence numbers and sort them
2256         // from oldest to newest
2257         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2258         {
2259                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2260                 while (trit->hasNext())
2261                         transactionSequenceNumbersSorted->add(trit->next());
2262                 delete trit;
2263         }
2264
2265         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2266
2267         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2268
2269
2270         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2271                 // If there is a gap in the transaction sequence numbers then
2272                 // there was a commit or an abort of a transaction OR there was a
2273                 // new commit (Could be from offline commit) so a redo the
2274                 // speculation from scratch
2275
2276                 // Start from scratch
2277                 speculatedKeyValueTable->clear();
2278                 lastTransactionSequenceNumberSpeculatedOn = -1;
2279                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2280         }
2281
2282         // Remember the front of the transaction list
2283         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2284
2285         // Find where to start arbitration from
2286         uint startIndex = 0;
2287
2288         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2289                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2290                         break;
2291         startIndex++;
2292
2293         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2294                 // Make sure we are not out of bounds
2295                 delete transactionSequenceNumbersSorted;
2296                 return false;           // did not speculate
2297         }
2298
2299         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2300         bool didSkip = true;
2301
2302         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2303                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2304                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2305
2306                 if (!transaction->isComplete()) {
2307                         // If there is an incomplete transaction then there is nothing
2308                         // we can do add this transactions arbitrator to the list of
2309                         // arbitrators we should ignore
2310                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2311                         didSkip = true;
2312                         continue;
2313                 }
2314
2315                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2316                         continue;
2317                 }
2318
2319                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2320
2321                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2322                         // Guard evaluated to true so update the speculative table
2323                         {
2324                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2325                                 while (kvit->hasNext()) {
2326                                         KeyValue *kv = kvit->next();
2327                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2328                                 }
2329                                 delete kvit;
2330                         }
2331                 }
2332         }
2333
2334         delete transactionSequenceNumbersSorted;
2335         
2336         if (didSkip) {
2337                 // Since there was a skip we need to redo the speculation next time around
2338                 lastTransactionSequenceNumberSpeculatedOn = -1;
2339                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2340         }
2341
2342         // We did some speculation
2343         return true;
2344 }
2345
2346 /**
2347  * Create the pending transaction speculative table from transactions
2348  * that are still in the pending transaction buffer
2349  */
2350 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2351         if (pendingTransactionQueue->size() == 0) {
2352                 // There is nothing to speculate on
2353                 return;
2354         }
2355
2356         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2357                 // need to reset on the pending speculation
2358                 lastPendingTransactionSpeculatedOn = NULL;
2359                 firstPendingTransaction = pendingTransactionQueue->get(0);
2360                 pendingTransactionSpeculatedKeyValueTable->clear();
2361         }
2362
2363         // Find where to start arbitration from
2364         uint startIndex = 0;
2365
2366         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2367                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2368                         break;
2369
2370         if (startIndex >= pendingTransactionQueue->size()) {
2371                 // Make sure we are not out of bounds
2372                 return;
2373         }
2374
2375         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2376                 Transaction *transaction = pendingTransactionQueue->get(i);
2377
2378                 lastPendingTransactionSpeculatedOn = transaction;
2379
2380                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2381                         // Guard evaluated to true so update the speculative table
2382                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2383                         while (kvit->hasNext()) {
2384                                 KeyValue *kv = kvit->next();
2385                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2386                         }
2387                         delete kvit;
2388                 }
2389         }
2390 }
2391
2392 /**
2393  * Set dead and remove from the live transaction tables the
2394  * transactions that are dead
2395  */
2396 void Table::updateLiveTransactionsAndStatus() {
2397         // Go through each of the transactions
2398         {
2399                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2400                 while (iter->hasNext()) {
2401                         int64_t key = iter->next();
2402                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2403
2404                         // Check if the transaction is dead
2405                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2406                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2407                                 // Set dead the transaction
2408                                 transaction->setDead();
2409
2410                                 // Remove the transaction from the live table
2411                                 iter->remove();
2412                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2413                                 delete transaction;
2414                         }
2415                 }
2416                 delete iter;
2417         }
2418
2419         // Go through each of the transactions
2420         {
2421                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2422                 while (iter->hasNext()) {
2423                         int64_t key = iter->next();
2424                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2425
2426                         // Check if the transaction is dead
2427                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2428                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2429                                 // Set committed
2430                                 status->setStatus(TransactionStatus_StatusCommitted);
2431
2432                                 // Remove
2433                                 iter->remove();
2434                         }
2435                 }
2436                 delete iter;
2437         }
2438 }
2439
2440 /**
2441  * Process this slot, entry by entry->  Also update the latest message sent by slot
2442  */
2443 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2444
2445         // Update the last message seen
2446         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2447
2448         // Process each entry in the slot
2449         Vector<Entry *> *entries = slot->getEntries();
2450         uint eSize = entries->size();
2451         for (uint ei = 0; ei < eSize; ei++) {
2452                 Entry *entry = entries->get(ei);
2453                 switch (entry->getType()) {
2454                 case TypeCommitPart:
2455                         processEntry((CommitPart *)entry);
2456                         break;
2457                 case TypeAbort:
2458                         processEntry((Abort *)entry);
2459                         break;
2460                 case TypeTransactionPart:
2461                         processEntry((TransactionPart *)entry);
2462                         break;
2463                 case TypeNewKey:
2464                         processEntry((NewKey *)entry);
2465                         break;
2466                 case TypeLastMessage:
2467                         processEntry((LastMessage *)entry, machineSet);
2468                         break;
2469                 case TypeRejectedMessage:
2470                         processEntry((RejectedMessage *)entry, indexer);
2471                         break;
2472                 case TypeTableStatus:
2473                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2474                         break;
2475                 default:
2476                         throw new Error("Unrecognized type: ");
2477                 }
2478         }
2479 }
2480
2481 /**
2482  * Update the last message that was sent for a machine Id
2483  */
2484 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2485         // Update what the last message received by a machine was
2486         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2487 }
2488
2489 /**
2490  * Add the new key to the arbitrators table and update the set of live
2491  * new keys (in case of a rescued new key message)
2492  */
2493 void Table::processEntry(NewKey *entry) {
2494         // Update the arbitrator table with the new key information
2495         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2496
2497         // Update what the latest live new key is
2498         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2499         if (oldNewKey != NULL) {
2500                 // Delete the old new key messages
2501                 oldNewKey->setDead();
2502         }
2503 }
2504
2505 /**
2506  * Process new table status entries and set dead the old ones as new
2507  * ones come in-> keeps track of the largest and smallest table status
2508  * seen in this current round of updating the local copy of the block
2509  * chain
2510  */
2511 void Table::processEntry(TableStatus *entry, int64_t seq) {
2512         int newNumSlots = entry->getMaxSlots();
2513         updateCurrMaxSize(newNumSlots);
2514         initExpectedSize(seq, newNumSlots);
2515
2516         if (liveTableStatus != NULL) {
2517                 // We have a larger table status so the old table status is no
2518                 // int64_ter alive
2519                 liveTableStatus->setDead();
2520         }
2521
2522         // Make this new table status the latest alive table status
2523         liveTableStatus = entry;
2524 }
2525
2526 /**
2527  * Check old messages to see if there is a block chain violation->
2528  * Also
2529  */
2530 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2531         int64_t oldSeqNum = entry->getOldSeqNum();
2532         int64_t newSeqNum = entry->getNewSeqNum();
2533         bool isequal = entry->getEqual();
2534         int64_t machineId = entry->getMachineID();
2535         int64_t seq = entry->getSequenceNumber();
2536
2537         // Check if we have messages that were supposed to be rejected in
2538         // our local block chain
2539         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2540                 // Get the slot
2541                 Slot *slot = indexer->getSlot(seqNum);
2542
2543                 if (slot != NULL) {
2544                         // If we have this slot make sure that it was not supposed to be
2545                         // a rejected slot
2546                         int64_t slotMachineId = slot->getMachineID();
2547                         if (isequal != (slotMachineId == machineId)) {
2548                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2549                         }
2550                 }
2551         }
2552
2553         // Create a list of clients to watch until they see this rejected
2554         // message entry->
2555         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2556         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2557         while (iter->hasNext()) {
2558                 // Machine ID for the last message entry
2559                 int64_t lastMessageEntryMachineId = iter->next();
2560
2561                 // We've seen it, don't need to continue to watch->  Our next
2562                 // message will implicitly acknowledge it->
2563                 if (lastMessageEntryMachineId == localMachineId) {
2564                         continue;
2565                 }
2566
2567                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2568                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2569
2570                 if (entrySequenceNumber < seq) {
2571                         // Add this rejected message to the set of messages that this
2572                         // machine ID did not see yet
2573                         addWatchVector(lastMessageEntryMachineId, entry);
2574                         // This client did not see this rejected message yet so add it
2575                         // to the watch set to monitor
2576                         deviceWatchSet->add(lastMessageEntryMachineId);
2577                 }
2578         }
2579         delete iter;
2580
2581         if (deviceWatchSet->isEmpty()) {
2582                 // This rejected message has been seen by all the clients so
2583                 entry->setDead();
2584                 delete deviceWatchSet;
2585         } else {
2586                 // We need to watch this rejected message
2587                 entry->setWatchSet(deviceWatchSet);
2588         }
2589 }
2590
2591 /**
2592  * Check if this abort is live, if not then save it so we can kill it
2593  * later-> update the last transaction number that was arbitrated on->
2594  */
2595 void Table::processEntry(Abort *entry) {
2596         if (entry->getTransactionSequenceNumber() != -1) {
2597                 // update the transaction status if it was sent to the server
2598                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2599                 if (status != NULL) {
2600                         status->setStatus(TransactionStatus_StatusAborted);
2601                 }
2602         }
2603
2604         // Abort has not been seen by the client it is for yet so we need to
2605         // keep track of it
2606
2607         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2608         if (previouslySeenAbort != NULL) {
2609                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2610         }
2611
2612         if (entry->getTransactionArbitrator() == localMachineId) {
2613                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2614         }
2615
2616         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2617                 // The machine already saw this so it is dead
2618                 entry->setDead();
2619                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2620                 liveAbortTable->remove(&abortid);
2621
2622                 if (entry->getTransactionArbitrator() == localMachineId) {
2623                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2624                 }
2625                 return;
2626         }
2627
2628         // Update the last arbitration data that we have seen so far
2629         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2630                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2631                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2632                         // Is larger
2633                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2634                 }
2635         } else {
2636                 // Never seen any data from this arbitrator so record the first one
2637                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2638         }
2639
2640         // Set dead a transaction if we can
2641         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2642
2643         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2644         if (transactionToSetDead != NULL) {
2645                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2646         }
2647
2648         // Update the last transaction sequence number that the arbitrator
2649         // arbitrated on
2650         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2651                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2652                 // Is a valid one
2653                 if (entry->getTransactionSequenceNumber() != -1) {
2654                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2655                 }
2656         }
2657 }
2658
2659 /**
2660  * Set dead the transaction part if that transaction is dead and keep
2661  * track of all new parts
2662  */
2663 void Table::processEntry(TransactionPart *entry) {
2664         // Check if we have already seen this transaction and set it dead OR
2665         // if it is not alive
2666         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2667                 // This transaction is dead, it was already committed or aborted
2668                 entry->setDead();
2669                 return;
2670         }
2671
2672         // This part is still alive
2673         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2674
2675         if (transactionPart == NULL) {
2676                 // Dont have a table for this machine Id yet so make one
2677                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2678                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2679         }
2680
2681         // Update the part and set dead ones we have already seen (got a
2682         // rescued version)
2683         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2684         if (previouslySeenPart != NULL) {
2685                 previouslySeenPart->setDead();
2686         }
2687 }
2688
2689 /**
2690  * Process new commit entries and save them for future use->  Delete duplicates
2691  */
2692 void Table::processEntry(CommitPart *entry) {
2693         // Update the last transaction that was updated if we can
2694         if (entry->getTransactionSequenceNumber() != -1) {
2695                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2696                                 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2697                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2698                 }
2699         }
2700
2701         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2702         if (commitPart == NULL) {
2703                 // Don't have a table for this machine Id yet so make one
2704                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2705                 newCommitParts->put(entry->getMachineId(), commitPart);
2706         }
2707         // Update the part and set dead ones we have already seen (got a
2708         // rescued version)
2709         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2710         if (previouslySeenPart != NULL) {
2711                 previouslySeenPart->setDead();
2712         }
2713 }
2714
2715 /**
2716  * Update the last message seen table-> Update and set dead the
2717  * appropriate RejectedMessages as clients see them-> Updates the live
2718  * aborts, removes those that are dead and sets them dead-> Check that
2719  * the last message seen is correct and that there is no mismatch of
2720  * our own last message or that other clients have not had a rollback
2721  * on the last message->
2722  */
2723 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2724         // We have seen this machine ID
2725         machineSet->remove(machineId);
2726
2727         // Get the set of rejected messages that this machine Id is has not seen yet
2728         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2729         // If there is a rejected message that this machine Id has not seen yet
2730         if (watchset != NULL) {
2731                 // Go through each rejected message that this machine Id has not
2732                 // seen yet
2733
2734                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2735                 while (rmit->hasNext()) {
2736                         RejectedMessage *rm = rmit->next();
2737                         // If this machine Id has seen this rejected message->->->
2738                         if (rm->getSequenceNumber() <= seqNum) {
2739                                 // Remove it from our watchlist
2740                                 rmit->remove();
2741                                 // Decrement machines that need to see this notification
2742                                 rm->removeWatcher(machineId);
2743                         }
2744                 }
2745                 delete rmit;
2746         }
2747
2748         // Set dead the abort
2749         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2750
2751         while (abortit->hasNext()) {
2752                 Pair<int64_t, int64_t> *key = abortit->next();
2753                 Abort *abort = liveAbortTable->get(key);
2754                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2755                         abort->setDead();
2756                         abortit->remove();
2757                         if (abort->getTransactionArbitrator() == localMachineId) {
2758                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2759                         }
2760                 }
2761         }
2762         delete abortit;
2763         if (machineId == localMachineId) {
2764                 // Our own messages are immediately dead->
2765                 char livenessType = liveness->getType();
2766                 if (livenessType == TypeLastMessage) {
2767                         ((LastMessage *)liveness)->setDead();
2768                 } else if (livenessType == TypeSlot) {
2769                         ((Slot *)liveness)->setDead();
2770                 } else {
2771                         throw new Error("Unrecognized type");
2772                 }
2773         }
2774         // Get the old last message for this device
2775         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2776         if (lastMessageEntry == NULL) {
2777                 // If no last message then there is nothing else to process
2778                 return;
2779         }
2780
2781         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2782         Liveness *lastEntry = lastMessageEntry->getSecond();
2783         delete lastMessageEntry;
2784
2785         // If it is not our machine Id since we already set ours to dead
2786         if (machineId != localMachineId) {
2787                 char lastEntryType = lastEntry->getType();
2788
2789                 if (lastEntryType == TypeLastMessage) {
2790                         ((LastMessage *)lastEntry)->setDead();
2791                 } else if (lastEntryType == TypeSlot) {
2792                         ((Slot *)lastEntry)->setDead();
2793                 } else {
2794                         throw new Error("Unrecognized type");
2795                 }
2796         }
2797         // Make sure the server is not playing any games
2798         if (machineId == localMachineId) {
2799                 if (hadPartialSendToServer) {
2800                         // We were not making any updates and we had a machine mismatch
2801                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2802                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2803                         }
2804                 } else {
2805                         // We were not making any updates and we had a machine mismatch
2806                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2807                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2808                         }
2809                 }
2810         } else {
2811                 if (lastMessageSeqNum > seqNum) {
2812                         throw new Error("Server Error: Rollback on remote machine sequence number");
2813                 }
2814         }
2815 }
2816
2817 /**
2818  * Add a rejected message entry to the watch set to keep track of
2819  * which clients have seen that rejected message entry and which have
2820  * not.
2821  */
2822 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2823         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2824         if (entries == NULL) {
2825                 // There is no set for this machine ID yet so create one
2826                 entries = new Hashset<RejectedMessage *>();
2827                 rejectedMessageWatchVectorTable->put(machineId, entries);
2828         }
2829         entries->add(entry);
2830 }
2831
2832 /**
2833  * Check if the HMAC chain is not violated
2834  */
2835 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2836         for (uint i = 0; i < newSlots->length(); i++) {
2837                 Slot *currSlot = newSlots->get(i);
2838                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2839                 if (prevSlot != NULL &&
2840                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2841                         throw new Error("Server Error: Invalid HMAC Chain");
2842         }
2843 }