edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastPendingSendArbitrationEntriesToDelete(NULL),
64         lastNewKey(NULL),
65         committedKeyValueTable(NULL),
66         speculatedKeyValueTable(NULL),
67         pendingTransactionSpeculatedKeyValueTable(NULL),
68         liveNewKeyTable(NULL),
69         lastMessageTable(NULL),
70         rejectedMessageWatchVectorTable(NULL),
71         arbitratorTable(NULL),
72         liveAbortTable(NULL),
73         newTransactionParts(NULL),
74         newCommitParts(NULL),
75         lastArbitratedTransactionNumberByArbitratorTable(NULL),
76         liveTransactionBySequenceNumberTable(NULL),
77         liveTransactionByTransactionIdTable(NULL),
78         liveCommitsTable(NULL),
79         liveCommitsByKeyTable(NULL),
80         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81         rejectedSlotVector(NULL),
82         pendingTransactionQueue(NULL),
83         pendingSendArbitrationRounds(NULL),
84         pendingSendArbitrationEntriesToDelete(NULL),
85         transactionPartsSent(NULL),
86         outstandingTransactionStatus(NULL),
87         liveAbortsGeneratedByLocal(NULL),
88         offlineTransactionsCommittedAndAtServer(NULL),
89         localCommunicationTable(NULL),
90         lastTransactionSeenFromMachineFromServer(NULL),
91         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92         lastInsertedNewKey(false),
93         lastSeqNumArbOn(0)
94 {
95         init();
96 }
97
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
99         buffer(NULL),
100         cloud(_cloud),
101         random(NULL),
102         liveTableStatus(NULL),
103         pendingTransactionBuilder(NULL),
104         lastPendingTransactionSpeculatedOn(NULL),
105         firstPendingTransaction(NULL),
106         numberOfSlots(0),
107         bufferResizeThreshold(0),
108         liveSlotCount(0),
109         oldestLiveSlotSequenceNumver(1),
110         localMachineId(_localMachineId),
111         sequenceNumber(0),
112         localSequenceNumber(0),
113         localTransactionSequenceNumber(0),
114         lastTransactionSequenceNumberSpeculatedOn(0),
115         oldestTransactionSequenceNumberSpeculatedOn(0),
116         localArbitrationSequenceNumber(0),
117         hadPartialSendToServer(false),
118         attemptedToSendToServer(false),
119         expectedsize(0),
120         didFindTableStatus(false),
121         currMaxSize(0),
122         lastSlotAttemptedToSend(NULL),
123         lastIsNewKey(false),
124         lastNewSize(0),
125         lastTransactionPartsSent(NULL),
126         lastPendingSendArbitrationEntriesToDelete(NULL),
127         lastNewKey(NULL),
128         committedKeyValueTable(NULL),
129         speculatedKeyValueTable(NULL),
130         pendingTransactionSpeculatedKeyValueTable(NULL),
131         liveNewKeyTable(NULL),
132         lastMessageTable(NULL),
133         rejectedMessageWatchVectorTable(NULL),
134         arbitratorTable(NULL),
135         liveAbortTable(NULL),
136         newTransactionParts(NULL),
137         newCommitParts(NULL),
138         lastArbitratedTransactionNumberByArbitratorTable(NULL),
139         liveTransactionBySequenceNumberTable(NULL),
140         liveTransactionByTransactionIdTable(NULL),
141         liveCommitsTable(NULL),
142         liveCommitsByKeyTable(NULL),
143         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144         rejectedSlotVector(NULL),
145         pendingTransactionQueue(NULL),
146         pendingSendArbitrationRounds(NULL),
147         pendingSendArbitrationEntriesToDelete(NULL),
148         transactionPartsSent(NULL),
149         outstandingTransactionStatus(NULL),
150         liveAbortsGeneratedByLocal(NULL),
151         offlineTransactionsCommittedAndAtServer(NULL),
152         localCommunicationTable(NULL),
153         lastTransactionSeenFromMachineFromServer(NULL),
154         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155         lastInsertedNewKey(false),
156         lastSeqNumArbOn(0)
157 {
158         init();
159 }
160
161 Table::~Table() {
162         delete cloud;
163         delete random;
164         delete buffer;
165         // init data structs
166         delete committedKeyValueTable;
167         delete speculatedKeyValueTable;
168         delete pendingTransactionSpeculatedKeyValueTable;
169         delete liveNewKeyTable;
170         delete lastMessageTable;
171         {
172                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
173                 while(rmit->hasNext()) {
174                         int64_t machineid = rmit->next();
175                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
176                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
177                         while (mit->hasNext()) {
178                                 RejectedMessage * rm = mit->next();
179                                 delete rm;
180                         }
181                         delete mit;
182                         delete rmset;
183                 }
184                 delete rmit;
185                 delete rejectedMessageWatchVectorTable;
186         }
187         delete arbitratorTable;
188         delete liveAbortTable;
189         {
190                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
191                 while (partsit->hasNext()) {
192                         int64_t machineId = partsit->next();
193                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
194                         delete parts;
195                 }
196                 delete partsit;
197                 delete newTransactionParts;
198         }
199         {
200                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
201                 while (partsit->hasNext()) {
202                         int64_t machineId = partsit->next();
203                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
204                         delete parts;
205                 }
206                 delete partsit;
207                 delete newCommitParts;
208         }
209         delete lastArbitratedTransactionNumberByArbitratorTable;
210         delete liveTransactionBySequenceNumberTable;
211         delete liveTransactionByTransactionIdTable;
212         {
213                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
214                 while (liveit->hasNext()) {
215                         int64_t arbitratorId = liveit->next();
216                         
217                         // Get all the commits for a specific arbitrator
218                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
219                         {
220                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
221                                 while (clientit->hasNext()) {
222                                         int64_t id = clientit->next();
223                                         delete commitForClientTable->get(id);
224                                 }
225                                 delete clientit;
226                         }
227                         
228                         delete commitForClientTable;
229                 }
230                 delete liveit;
231                 delete liveCommitsTable;
232         }
233         delete liveCommitsByKeyTable;
234         delete lastCommitSeenSequenceNumberByArbitratorTable;
235         delete rejectedSlotVector;
236         delete pendingTransactionQueue;
237         delete pendingSendArbitrationEntriesToDelete;
238         delete transactionPartsSent;
239         delete outstandingTransactionStatus;
240         delete liveAbortsGeneratedByLocal;
241         delete offlineTransactionsCommittedAndAtServer;
242         delete localCommunicationTable;
243         delete lastTransactionSeenFromMachineFromServer;
244         delete pendingSendArbitrationRounds;
245         if (lastTransactionPartsSent != NULL)
246                 delete lastTransactionPartsSent;
247         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
248 }
249
250 /**
251  * Init all the stuff needed for for table usage
252  */
253 void Table::init() {
254         // Init helper objects
255         random = new SecureRandom();
256         buffer = new SlotBuffer();
257
258         // init data structs
259         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
260         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
261         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
262         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
263         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
264         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
265         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
266         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
267         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
268         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
269         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
270         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
271         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
272         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
273         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
274         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
275         rejectedSlotVector = new Vector<int64_t>();
276         pendingTransactionQueue = new Vector<Transaction *>();
277         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
278         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
279         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
280         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
281         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
282         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
283         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
284         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
285         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
286
287         // Other init stuff
288         numberOfSlots = buffer->capacity();
289         setResizeThreshold();
290 }
291
292 /**
293  * Initialize the table by inserting a table status as the first entry
294  * into the table status also initialize the crypto stuff.
295  */
296 void Table::initTable() {
297         cloud->initSecurity();
298
299         // Create the first insertion into the block chain which is the table status
300         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
301         localSequenceNumber++;
302         TableStatus *status = new TableStatus(s, numberOfSlots);
303         s->addEntry(status);
304         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
305
306         if (array == NULL) {
307                 array = new Array<Slot *>(1);
308                 array->set(0, s);
309                 // update local block chain
310                 validateAndUpdate(array, true);
311                 delete array;
312         } else if (array->length() == 1) {
313                 // in case we did push the slot BUT we failed to init it
314                 validateAndUpdate(array, true);
315                 delete array;
316         } else {
317                 delete array;
318                 throw new Error("Error on initialization");
319         }
320 }
321
322 /**
323  * Rebuild the table from scratch by pulling the latest block chain
324  * from the server.
325  */
326 void Table::rebuild() {
327         // Just pull the latest slots from the server
328         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
329         validateAndUpdate(newslots, true);
330         delete newslots;
331         sendToServer(NULL);
332         updateLiveTransactionsAndStatus();
333 }
334
335 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
336         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
337 }
338
339 int64_t Table::getArbitrator(IoTString *key) {
340         return arbitratorTable->get(key);
341 }
342
343 void Table::close() {
344         cloud->closeCloud();
345 }
346
347 IoTString *Table::getCommitted(IoTString *key)  {
348         KeyValue *kv = committedKeyValueTable->get(key);
349
350         if (kv != NULL) {
351                 return new IoTString(kv->getValue());
352         } else {
353                 return NULL;
354         }
355 }
356
357 IoTString *Table::getSpeculative(IoTString *key) {
358         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
359
360         if (kv == NULL) {
361                 kv = speculatedKeyValueTable->get(key);
362         }
363
364         if (kv == NULL) {
365                 kv = committedKeyValueTable->get(key);
366         }
367
368         if (kv != NULL) {
369                 return new IoTString(kv->getValue());
370         } else {
371                 return NULL;
372         }
373 }
374
375 IoTString *Table::getCommittedAtomic(IoTString *key) {
376         KeyValue *kv = committedKeyValueTable->get(key);
377
378         if (!arbitratorTable->contains(key)) {
379                 throw new Error("Key not Found.");
380         }
381
382         // Make sure new key value pair matches the current arbitrator
383         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
384                 // TODO: Maybe not throw en error
385                 throw new Error("Not all Key Values Match Arbitrator.");
386         }
387
388         if (kv != NULL) {
389                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
390                 return new IoTString(kv->getValue());
391         } else {
392                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
393                 return NULL;
394         }
395 }
396
397 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
398         if (!arbitratorTable->contains(key)) {
399                 throw new Error("Key not Found.");
400         }
401
402         // Make sure new key value pair matches the current arbitrator
403         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
404                 // TODO: Maybe not throw en error
405                 throw new Error("Not all Key Values Match Arbitrator.");
406         }
407
408         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
409
410         if (kv == NULL) {
411                 kv = speculatedKeyValueTable->get(key);
412         }
413
414         if (kv == NULL) {
415                 kv = committedKeyValueTable->get(key);
416         }
417
418         if (kv != NULL) {
419                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
420                 return new IoTString(kv->getValue());
421         } else {
422                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
423                 return NULL;
424         }
425 }
426
427 bool Table::update()  {
428         try {
429                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
430                 validateAndUpdate(newSlots, false);
431                 delete newSlots;
432                 sendToServer(NULL);
433                 updateLiveTransactionsAndStatus();
434                 return true;
435         } catch (Exception *e) {
436                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
437                 while (kit->hasNext()) {
438                         int64_t m = kit->next();
439                         updateFromLocal(m);
440                 }
441                 delete kit;
442         }
443
444         return false;
445 }
446
447 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
448         while (true) {
449                 if (arbitratorTable->contains(keyName)) {
450                         // There is already an arbitrator
451                         return false;
452                 }
453                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
454
455                 if (sendToServer(newKey)) {
456                         // If successfully inserted
457                         return true;
458                 }
459         }
460 }
461
462 void Table::startTransaction() {
463         // Create a new transaction, invalidates any old pending transactions.
464         pendingTransactionBuilder = new PendingTransaction(localMachineId);
465 }
466
467 void Table::put(IoTString *key, IoTString *value) {
468         // Make sure it is a valid key
469         if (!arbitratorTable->contains(key)) {
470                 throw new Error("Key not Found.");
471         }
472
473         // Make sure new key value pair matches the current arbitrator
474         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
475                 // TODO: Maybe not throw en error
476                 throw new Error("Not all Key Values Match Arbitrator.");
477         }
478
479         // Add the key value to this transaction
480         KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
481         pendingTransactionBuilder->addKV(kv);
482 }
483
484 TransactionStatus *Table::commitTransaction() {
485         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
486                 // transaction with no updates will have no effect on the system
487                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
488         }
489
490         // Set the local transaction sequence number and increment
491         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
492         localTransactionSequenceNumber++;
493
494         // Create the transaction status
495         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
496
497         // Create the new transaction
498         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
499         newTransaction->setTransactionStatus(transactionStatus);
500
501         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
502                 // Add it to the queue and invalidate the builder for safety
503                 pendingTransactionQueue->add(newTransaction);
504         } else {
505                 arbitrateOnLocalTransaction(newTransaction);
506                 updateLiveStateFromLocal();
507         }
508
509         pendingTransactionBuilder = new PendingTransaction(localMachineId);
510
511         try {
512                 sendToServer(NULL);
513         } catch (ServerException *e) {
514
515                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
516                 uint size = pendingTransactionQueue->size();
517                 uint oldindex = 0;
518                 for (uint iter = 0; iter < size; iter++) {
519                         Transaction *transaction = pendingTransactionQueue->get(iter);
520                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
521
522                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
523                                 // Already contacted this client so ignore all attempts to contact this client
524                                 // to preserve ordering for arbitrator
525                                 continue;
526                         }
527
528                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
529
530                         if (sendReturn.getFirst()) {
531                                 // Failed to contact over local
532                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
533                         } else {
534                                 // Successful contact or should not contact
535
536                                 if (sendReturn.getSecond()) {
537                                         // did arbitrate
538                                         oldindex--;
539                                 }
540                         }
541                 }
542                 pendingTransactionQueue->setSize(oldindex);
543         }
544
545         updateLiveStateFromLocal();
546
547         return transactionStatus;
548 }
549
550 /**
551  * Recalculate the new resize threshold
552  */
553 void Table::setResizeThreshold() {
554         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
555         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
556 }
557
558 int64_t Table::getLocalSequenceNumber() {
559         return localSequenceNumber;
560 }
561
562 NewKey * Table::handlePartialSend(NewKey * newKey) {
563         //Didn't receive acknowledgement for last send
564         //See if the server has received a newer slot
565         
566         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
567         if (newSlots->length() == 0) {
568                 //Retry sending old slot
569                 bool wasInserted = false;
570                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
571                 
572                 if (sendSlotsReturn) {
573                         if (newKey != NULL) {
574                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
575                                         newKey = NULL;
576                                 }
577                         }
578                         
579                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
580                         while (trit->hasNext()) {
581                                 Transaction *transaction = trit->next();
582                                 transaction->resetServerFailure();
583                                 // Update which transactions parts still need to be sent
584                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
585                                 // Add the transaction status to the outstanding list
586                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
587                                 
588                                 // Update the transaction status
589                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
590                                 
591                                 // Check if all the transaction parts were successfully
592                                 // sent and if so then remove it from pending
593                                 if (transaction->didSendAllParts()) {
594                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
595                                         pendingTransactionQueue->remove(transaction);
596                                 }
597                         }
598                         delete trit;
599                 } else {
600                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
601                                 if (newKey != NULL) {
602                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
603                                                 newKey = NULL;
604                                         }
605                                 }
606                                 
607                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
608                                 while (trit->hasNext()) {
609                                         Transaction *transaction = trit->next();
610                                         transaction->resetServerFailure();
611                                         
612                                         // Update which transactions parts still need to be sent
613                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
614                                         
615                                         // Add the transaction status to the outstanding list
616                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
617                                         
618                                         // Update the transaction status
619                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
620                                         
621                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
622                                         if (transaction->didSendAllParts()) {
623                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
624                                                 pendingTransactionQueue->remove(transaction);
625                                         } else {
626                                                 transaction->resetServerFailure();
627                                                 // Set the transaction sequence number back to nothing
628                                                 if (!transaction->didSendAPartToServer()) {
629                                                         transaction->setSequenceNumber(-1);
630                                                 }
631                                         }
632                                 }
633                                 delete trit;
634                         }
635                 }
636                 
637                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
638                 while (trit->hasNext()) {
639                         Transaction *transaction = trit->next();
640                         transaction->resetServerFailure();
641                         // Set the transaction sequence number back to nothing
642                         if (!transaction->didSendAPartToServer()) {
643                                 transaction->setSequenceNumber(-1);
644                         }
645                 }
646                 delete trit;
647                 
648                 if (newSlots->length() != 0) {
649                         // insert into the local block chain
650                         validateAndUpdate(newSlots, true);
651                 }
652         } else {
653                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
654                         if (newKey != NULL) {
655                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
656                                         newKey = NULL;
657                                 }
658                         }
659                         
660                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
661                         while (trit->hasNext()) {
662                                 Transaction *transaction = trit->next();
663                                 transaction->resetServerFailure();
664                                 
665                                 // Update which transactions parts still need to be sent
666                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
667                                 
668                                 // Add the transaction status to the outstanding list
669                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
670                                 
671                                 // Update the transaction status
672                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
673                                 
674                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
675                                 if (transaction->didSendAllParts()) {
676                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
677                                         pendingTransactionQueue->remove(transaction);
678                                 } else {
679                                         transaction->resetServerFailure();
680                                         // Set the transaction sequence number back to nothing
681                                         if (!transaction->didSendAPartToServer()) {
682                                                 transaction->setSequenceNumber(-1);
683                                         }
684                                 }
685                         }
686                         delete trit;
687                 } else {
688                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
689                         while (trit->hasNext()) {
690                                 Transaction *transaction = trit->next();
691                                 transaction->resetServerFailure();
692                                 // Set the transaction sequence number back to nothing
693                                 if (!transaction->didSendAPartToServer()) {
694                                         transaction->setSequenceNumber(-1);
695                                 }
696                         }
697                         delete trit;
698                 }
699                 
700                 // insert into the local block chain
701                 validateAndUpdate(newSlots, true);
702         }
703         delete newSlots;
704         return newKey;
705 }
706
707 bool Table::sendToServer(NewKey *newKey) {
708         if (hadPartialSendToServer) {
709                 newKey = handlePartialSend(newKey);
710         }
711
712         try {
713                 // While we have stuff that needs inserting into the block chain
714                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
715                         if (hadPartialSendToServer) {
716                                 throw new Error("Should Be error free");
717                         }
718                         
719                         // If there is a new key with same name then end
720                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
721                                 return false;
722                         }
723
724                         // Create the slot
725                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
726                         localSequenceNumber++;
727
728                         // Try to fill the slot with data
729                         int newSize = 0;
730                         bool insertedNewKey = false;
731                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
732
733                         if (needsResize) {
734                                 // Reset which transaction to send
735                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
736                                 while (trit->hasNext()) {
737                                         Transaction *transaction = trit->next();
738                                         transaction->resetNextPartToSend();
739
740                                         // Set the transaction sequence number back to nothing
741                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
742                                                 transaction->setSequenceNumber(-1);
743                                         }
744                                 }
745                                 delete trit;
746
747                                 // Clear the sent data since we are trying again
748                                 pendingSendArbitrationEntriesToDelete->clear();
749                                 transactionPartsSent->clear();
750
751                                 // We needed a resize so try again
752                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
753                         }
754
755                         lastSlotAttemptedToSend = slot;
756                         lastIsNewKey = (newKey != NULL);
757                         lastInsertedNewKey = insertedNewKey;
758                         lastNewSize = newSize;
759                         lastNewKey = newKey;
760                         if (lastTransactionPartsSent != NULL)
761                                 delete lastTransactionPartsSent;
762                         lastTransactionPartsSent = transactionPartsSent->clone();
763                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
764
765                         Array<Slot *> * newSlots = NULL;
766                         bool wasInserted = false;
767                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
768
769                         if (sendSlotsReturn) {
770                                 // Did insert into the block chain
771                                 if (insertedNewKey) {
772                                         // This slot was what was inserted not a previous slot
773                                         // New Key was successfully inserted into the block chain so dont want to insert it again
774                                         newKey = NULL;
775                                 }
776
777                                 // Remove the aborts and commit parts that were sent from the pending to send queue
778                                 uint size = pendingSendArbitrationRounds->size();
779                                 uint oldcount = 0;
780                                 for (uint i = 0; i < size; i++) {
781                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
782                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
783
784                                         if (!round->isDoneSending()) {
785                                                 //Add part back in
786                                                 pendingSendArbitrationRounds->set(oldcount++,
787                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
788                                         }
789                                 }
790                                 pendingSendArbitrationRounds->setSize(oldcount);
791
792                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
793                                 while (trit->hasNext()) {
794                                         Transaction *transaction = trit->next();
795                                         transaction->resetServerFailure();
796
797                                         // Update which transactions parts still need to be sent
798                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
799
800                                         // Add the transaction status to the outstanding list
801                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
802
803                                         // Update the transaction status
804                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
805
806                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
807                                         if (transaction->didSendAllParts()) {
808                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
809                                                 pendingTransactionQueue->remove(transaction);
810                                         }
811                                 }
812                                 delete trit;
813                         } else {
814                                 // Reset which transaction to send
815                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
816                                 while (trit->hasNext()) {
817                                         Transaction *transaction = trit->next();
818                                         transaction->resetNextPartToSend();
819
820                                         // Set the transaction sequence number back to nothing
821                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
822                                                 transaction->setSequenceNumber(-1);
823                                         }
824                                 }
825                                 delete trit;
826                         }
827
828                         // Clear the sent data in preparation for next send
829                         pendingSendArbitrationEntriesToDelete->clear();
830                         transactionPartsSent->clear();
831
832                         if (newSlots->length() != 0) {
833                                 // insert into the local block chain
834                                 validateAndUpdate(newSlots, true);
835                         }
836                         delete newSlots;
837                 }
838         } catch (ServerException *e) {
839                 if (e->getType() != ServerException_TypeInputTimeout) {
840                         // Nothing was able to be sent to the server so just clear these data structures
841                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
842                         while (trit->hasNext()) {
843                                 Transaction *transaction = trit->next();
844                                 transaction->resetNextPartToSend();
845
846                                 // Set the transaction sequence number back to nothing
847                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
848                                         transaction->setSequenceNumber(-1);
849                                 }
850                         }
851                         delete trit;
852                 } else {
853                         // There was a partial send to the server
854                         hadPartialSendToServer = true;
855
856                         // Nothing was able to be sent to the server so just clear these data structures
857                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
858                         while (trit->hasNext()) {
859                                 Transaction *transaction = trit->next();
860                                 transaction->resetNextPartToSend();
861                                 transaction->setServerFailure();
862                         }
863                         delete trit;
864                 }
865
866                 pendingSendArbitrationEntriesToDelete->clear();
867                 transactionPartsSent->clear();
868
869                 throw e;
870         }
871
872         return newKey == NULL;
873 }
874
875 bool Table::updateFromLocal(int64_t machineId) {
876         if (!localCommunicationTable->contains(machineId))
877                 return false;
878
879         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
880
881         // Get the size of the send data
882         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
883
884         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
885         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
886                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
887         }
888
889         Array<char> *sendData = new Array<char>(sendDataSize);
890         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
891
892         // Encode the data
893         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
894         bbEncode->putInt(0);
895
896         // Send by local
897         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
898         localSequenceNumber++;
899
900         if (returnData == NULL) {
901                 // Could not contact server
902                 return false;
903         }
904
905         // Decode the data
906         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
907         int numberOfEntries = bbDecode->getInt();
908
909         for (int i = 0; i < numberOfEntries; i++) {
910                 char type = bbDecode->get();
911                 if (type == TypeAbort) {
912                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
913                         processEntry(abort);
914                 } else if (type == TypeCommitPart) {
915                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
916                         processEntry(commitPart);
917                 }
918         }
919
920         updateLiveStateFromLocal();
921
922         return true;
923 }
924
925 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
926
927         // Get the devices local communications
928         if (!localCommunicationTable->contains(transaction->getArbitrator()))
929                 return Pair<bool, bool>(true, false);
930
931         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
932
933         // Get the size of the send data
934         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
935         {
936                 Vector<TransactionPart *> *tParts = transaction->getParts();
937                 uint tPartsSize = tParts->size();
938                 for (uint i = 0; i < tPartsSize; i++) {
939                         TransactionPart *part = tParts->get(i);
940                         sendDataSize += part->getSize();
941                 }
942         }
943
944         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
945         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
946                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
947         }
948
949         // Make the send data size
950         Array<char> *sendData = new Array<char>(sendDataSize);
951         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
952
953         // Encode the data
954         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
955         bbEncode->putInt(transaction->getParts()->size());
956         {
957                 Vector<TransactionPart *> *tParts = transaction->getParts();
958                 uint tPartsSize = tParts->size();
959                 for (uint i = 0; i < tPartsSize; i++) {
960                         TransactionPart *part = tParts->get(i);
961                         part->encode(bbEncode);
962                 }
963         }
964
965         // Send by local
966         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
967         localSequenceNumber++;
968
969         if (returnData == NULL) {
970                 // Could not contact server
971                 return Pair<bool, bool>(true, false);
972         }
973
974         // Decode the data
975         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
976         bool didCommit = bbDecode->get() == 1;
977         bool couldArbitrate = bbDecode->get() == 1;
978         int numberOfEntries = bbDecode->getInt();
979         bool foundAbort = false;
980
981         for (int i = 0; i < numberOfEntries; i++) {
982                 char type = bbDecode->get();
983                 if (type == TypeAbort) {
984                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
985
986                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
987                                 foundAbort = true;
988                         }
989
990                         processEntry(abort);
991                 } else if (type == TypeCommitPart) {
992                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
993                         processEntry(commitPart);
994                 }
995         }
996
997         updateLiveStateFromLocal();
998
999         if (couldArbitrate) {
1000                 TransactionStatus *status =  transaction->getTransactionStatus();
1001                 if (didCommit) {
1002                         status->setStatus(TransactionStatus_StatusCommitted);
1003                 } else {
1004                         status->setStatus(TransactionStatus_StatusAborted);
1005                 }
1006         } else {
1007                 TransactionStatus *status =  transaction->getTransactionStatus();
1008                 if (foundAbort) {
1009                         status->setStatus(TransactionStatus_StatusAborted);
1010                 } else {
1011                         status->setStatus(TransactionStatus_StatusCommitted);
1012                 }
1013         }
1014
1015         return Pair<bool, bool>(false, true);
1016 }
1017
1018 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1019         // Decode the data
1020         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1021         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1022         int numberOfParts = bbDecode->getInt();
1023
1024         // If we did commit a transaction or not
1025         bool didCommit = false;
1026         bool couldArbitrate = false;
1027
1028         if (numberOfParts != 0) {
1029
1030                 // decode the transaction
1031                 Transaction *transaction = new Transaction();
1032                 for (int i = 0; i < numberOfParts; i++) {
1033                         bbDecode->get();
1034                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1035                         transaction->addPartDecode(newPart);
1036                 }
1037
1038                 // Arbitrate on transaction and pull relevant return data
1039                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1040                 couldArbitrate = localArbitrateReturn.getFirst();
1041                 didCommit = localArbitrateReturn.getSecond();
1042
1043                 updateLiveStateFromLocal();
1044
1045                 // Transaction was sent to the server so keep track of it to prevent double commit
1046                 if (transaction->getSequenceNumber() != -1) {
1047                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1048                 }
1049         }
1050
1051         // The data to send back
1052         int returnDataSize = 0;
1053         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1054
1055         // Get the aborts to send back
1056         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1057         {
1058                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1059                 while (abortit->hasNext())
1060                         abortLocalSequenceNumbers->add(abortit->next());
1061                 delete abortit;
1062         }
1063
1064         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1065
1066         uint asize = abortLocalSequenceNumbers->size();
1067         for (uint i = 0; i < asize; i++) {
1068                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1069                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1070                         continue;
1071                 }
1072
1073                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1074                 unseenArbitrations->add(abort);
1075                 returnDataSize += abort->getSize();
1076         }
1077
1078         // Get the commits to send back
1079         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1080         if (commitForClientTable != NULL) {
1081                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1082                 {
1083                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1084                         while (commitit->hasNext())
1085                                 commitLocalSequenceNumbers->add(commitit->next());
1086                         delete commitit;
1087                 }
1088                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1089
1090                 uint clsSize = commitLocalSequenceNumbers->size();
1091                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1092                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1093                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1094
1095                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1096                                 continue;
1097                         }
1098
1099                         {
1100                                 Vector<CommitPart *> *parts = commit->getParts();
1101                                 uint nParts = parts->size();
1102                                 for (uint i = 0; i < nParts; i++) {
1103                                         CommitPart *commitPart = parts->get(i);
1104                                         unseenArbitrations->add(commitPart);
1105                                         returnDataSize += commitPart->getSize();
1106                                 }
1107                         }
1108                 }
1109         }
1110
1111         // Number of arbitration entries to decode
1112         returnDataSize += 2 * sizeof(int32_t);
1113
1114         // bool of did commit or not
1115         if (numberOfParts != 0) {
1116                 returnDataSize += sizeof(char);
1117         }
1118
1119         // Data to send Back
1120         Array<char> *returnData = new Array<char>(returnDataSize);
1121         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1122
1123         if (numberOfParts != 0) {
1124                 if (didCommit) {
1125                         bbEncode->put((char)1);
1126                 } else {
1127                         bbEncode->put((char)0);
1128                 }
1129                 if (couldArbitrate) {
1130                         bbEncode->put((char)1);
1131                 } else {
1132                         bbEncode->put((char)0);
1133                 }
1134         }
1135
1136         bbEncode->putInt(unseenArbitrations->size());
1137         uint size = unseenArbitrations->size();
1138         for (uint i = 0; i < size; i++) {
1139                 Entry *entry = unseenArbitrations->get(i);
1140                 entry->encode(bbEncode);
1141         }
1142
1143         localSequenceNumber++;
1144         return returnData;
1145 }
1146
1147 /** Checks whether a given slot was sent using new slots in
1148                 array. Returns true if sent and false otherwise.  */
1149
1150 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1151         uint size = array->length();
1152         for (uint i = 0; i < size; i++) {
1153                 Slot *s = array->get(i);
1154                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1155                         return true;
1156                 }
1157         }
1158         
1159         //Also need to see if other machines acknowledged our message
1160         for (uint i = 0; i < size; i++) {
1161                 Slot *s = array->get(i);
1162                 
1163                 // Process each entry in the slot
1164                 Vector<Entry *> *entries = s->getEntries();
1165                 uint eSize = entries->size();
1166                 for (uint ei = 0; ei < eSize; ei++) {
1167                         Entry *entry = entries->get(ei);
1168                         
1169                         if (entry->getType() == TypeLastMessage) {
1170                                 LastMessage *lastMessage = (LastMessage *)entry;
1171                                 
1172                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1173                                         return true;
1174                                 }
1175                         }
1176                 }
1177         }
1178         //Not found
1179         return false;
1180 }
1181
1182 /** Method tries to send slot to server.  Returns status in tuple.
1183                 isInserted returns whether last un-acked send (if any) was
1184                 successful.  Returns whether send was confirmed.x
1185  */
1186
1187 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1188         attemptedToSendToServer = true;
1189
1190         *array = cloud->putSlot(slot, newSize);
1191         if (*array == NULL) {
1192                 *array = new Array<Slot *>(1);
1193                 (*array)->set(0, slot);
1194                 rejectedSlotVector->clear();
1195                 *isInserted = false;
1196                 return true;
1197         } else {
1198                 if ((*array)->length() == 0) {
1199                         throw new Error("Server Error: Did not send any slots");
1200                 }
1201
1202                 if (hadPartialSendToServer) {
1203                         *isInserted = checkSend(*array, slot);
1204
1205                         if (!(*isInserted)) {
1206                                 rejectedSlotVector->add(slot->getSequenceNumber());
1207                         }
1208                         
1209                         return false;
1210                 } else {
1211                         rejectedSlotVector->add(slot->getSequenceNumber());
1212                         *isInserted = false;
1213                         return false;
1214                 }
1215         }
1216 }
1217
1218 /**
1219  * Returns true if a resize was needed but not done.
1220  */
1221 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1222         newSize = 0;//special value to indicate no resize
1223         if (liveSlotCount > bufferResizeThreshold) {
1224                 resize = true;//Resize is forced
1225         }
1226
1227         if (resize) {
1228                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1229                 TableStatus *status = new TableStatus(slot, newSize);
1230                 slot->addEntry(status);
1231         }
1232
1233         // Fill with rejected slots first before doing anything else
1234         doRejectedMessages(slot);
1235
1236         // Do mandatory rescue of entries
1237         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1238
1239         // Extract working variables
1240         bool needsResize = mandatoryRescueReturn.getFirst();
1241         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1242         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1243
1244         if (needsResize && !resize) {
1245                 // We need to resize but we are not resizing so return true to force on retry
1246                 return true;
1247         }
1248
1249         insertedKey = false;
1250         if (newKeyEntry != NULL) {
1251                 newKeyEntry->setSlot(slot);
1252                 if (slot->hasSpace(newKeyEntry)) {
1253                         slot->addEntry(newKeyEntry);
1254                         insertedKey = true;
1255                 }
1256         }
1257
1258         // Clear the transactions, aborts and commits that were sent previously
1259         transactionPartsSent->clear();
1260         pendingSendArbitrationEntriesToDelete->clear();
1261         uint size = pendingSendArbitrationRounds->size();
1262         for (uint i = 0; i < size; i++) {
1263                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1264                 bool isFull = false;
1265                 round->generateParts();
1266                 Vector<Entry *> *parts = round->getParts();
1267
1268                 // Insert pending arbitration data
1269                 uint vsize = parts->size();
1270                 for (uint vi = 0; vi < vsize; vi++) {
1271                         Entry *arbitrationData = parts->get(vi);
1272
1273                         // If it is an abort then we need to set some information
1274                         if (arbitrationData->getType() == TypeAbort) {
1275                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1276                         }
1277
1278                         if (!slot->hasSpace(arbitrationData)) {
1279                                 // No space so cant do anything else with these data entries
1280                                 isFull = true;
1281                                 break;
1282                         }
1283
1284                         // Add to this current slot and add it to entries to delete
1285                         slot->addEntry(arbitrationData);
1286                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1287                 }
1288
1289                 if (isFull) {
1290                         break;
1291                 }
1292         }
1293
1294         if (pendingTransactionQueue->size() > 0) {
1295                 Transaction *transaction = pendingTransactionQueue->get(0);
1296                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1297                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1298                         transaction->setSequenceNumber(slot->getSequenceNumber());
1299                 }
1300
1301                 while (true) {
1302                         TransactionPart *part = transaction->getNextPartToSend();
1303                         if (part == NULL) {
1304                                 // Ran out of parts to send for this transaction so move on
1305                                 break;
1306                         }
1307
1308                         if (slot->hasSpace(part)) {
1309                                 slot->addEntry(part);
1310                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1311                                 if (partsSent == NULL) {
1312                                         partsSent = new Vector<int32_t>();
1313                                         transactionPartsSent->put(transaction, partsSent);
1314                                 }
1315                                 partsSent->add(part->getPartNumber());
1316                                 transactionPartsSent->put(transaction, partsSent);
1317                         } else {
1318                                 break;
1319                         }
1320                 }
1321         }
1322
1323         // Fill the remainder of the slot with rescue data
1324         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1325
1326         return false;
1327 }
1328
1329 void Table::doRejectedMessages(Slot *s) {
1330         if (!rejectedSlotVector->isEmpty()) {
1331                 /* TODO: We should avoid generating a rejected message entry if
1332                  * there is already a sufficient entry in the queue (e->g->,
1333                  * equalsto value of true and same sequence number)->  */
1334
1335                 int64_t old_seqn = rejectedSlotVector->get(0);
1336                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1337                         int64_t new_seqn = rejectedSlotVector->lastElement();
1338                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1339                         s->addEntry(rm);
1340                 } else {
1341                         int64_t prev_seqn = -1;
1342                         uint i = 0;
1343                         /* Go through list of missing messages */
1344                         for (; i < rejectedSlotVector->size(); i++) {
1345                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1346                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1347                                 if (s_msg != NULL)
1348                                         break;
1349                                 prev_seqn = curr_seqn;
1350                         }
1351                         /* Generate rejected message entry for missing messages */
1352                         if (prev_seqn != -1) {
1353                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1354                                 s->addEntry(rm);
1355                         }
1356                         /* Generate rejected message entries for present messages */
1357                         for (; i < rejectedSlotVector->size(); i++) {
1358                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1359                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1360                                 int64_t machineid = s_msg->getMachineID();
1361                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1362                                 s->addEntry(rm);
1363                         }
1364                 }
1365         }
1366 }
1367
1368 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1369         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1370         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1371         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1372                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1373         }
1374
1375         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1376         bool seenLiveSlot = false;
1377         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1378         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1379
1380
1381         // Mandatory Rescue
1382         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1383                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1384                 // Push slot number forward
1385                 if (!seenLiveSlot) {
1386                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1387                 }
1388
1389                 if (!previousSlot->isLive()) {
1390                         continue;
1391                 }
1392
1393                 // We have seen a live slot
1394                 seenLiveSlot = true;
1395
1396                 // Get all the live entries for a slot
1397                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1398
1399                 // Iterate over all the live entries and try to rescue them
1400                 uint lESize = liveEntries->size();
1401                 for (uint i = 0; i < lESize; i++) {
1402                         Entry *liveEntry = liveEntries->get(i);
1403                         if (slot->hasSpace(liveEntry)) {
1404                                 // Enough space to rescue the entry
1405                                 slot->addEntry(liveEntry);
1406                         } else if (currentSequenceNumber == firstIfFull) {
1407                                 //if there's no space but the entry is about to fall off the queue
1408                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1409                         }
1410                 }
1411         }
1412
1413         // Did not resize
1414         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1415 }
1416
1417 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1418         /* now go through live entries from least to greatest sequence number until
1419          * either all live slots added, or the slot doesn't have enough room
1420          * for SKIP_THRESHOLD consecutive entries*/
1421         int skipcount = 0;
1422         int64_t newestseqnum = buffer->getNewestSeqNum();
1423         for (; seqn <= newestseqnum; seqn++) {
1424                 Slot *prevslot = buffer->getSlot(seqn);
1425                 //Push slot number forward
1426                 if (!seenliveslot)
1427                         oldestLiveSlotSequenceNumver = seqn;
1428
1429                 if (!prevslot->isLive())
1430                         continue;
1431                 seenliveslot = true;
1432                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1433                 uint lESize = liveentries->size();
1434                 for (uint i = 0; i < lESize; i++) {
1435                         Entry *liveentry = liveentries->get(i);
1436                         if (s->hasSpace(liveentry))
1437                                 s->addEntry(liveentry);
1438                         else {
1439                                 skipcount++;
1440                                 if (skipcount > Table_SKIP_THRESHOLD)
1441                                         goto donesearch;
1442                         }
1443                 }
1444         }
1445 donesearch:
1446         ;
1447 }
1448
1449 /**
1450  * Checks for malicious activity and updates the local copy of the block chain->
1451  */
1452 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1453         // The cloud communication layer has checked slot HMACs already
1454         // before decoding
1455         if (newSlots->length() == 0) {
1456                 return;
1457         }
1458
1459         // Make sure all slots are newer than the last largest slot this
1460         // client has seen
1461         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1462         if (firstSeqNum <= sequenceNumber) {
1463                 throw new Error("Server Error: Sent older slots!");
1464         }
1465
1466         // Create an object that can access both new slots and slots in our
1467         // local chain without committing slots to our local chain
1468         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1469
1470         // Check that the HMAC chain is not broken
1471         checkHMACChain(indexer, newSlots);
1472
1473         // Set to keep track of messages from clients
1474         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1475         {
1476                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1477                 while (lmit->hasNext())
1478                         machineSet->add(lmit->next());
1479                 delete lmit;
1480         }
1481
1482         // Process each slots data
1483         {
1484                 uint numSlots = newSlots->length();
1485                 for (uint i = 0; i < numSlots; i++) {
1486                         Slot *slot = newSlots->get(i);
1487                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1488                         updateExpectedSize();
1489                 }
1490         }
1491         delete indexer;
1492         
1493         // If there is a gap, check to see if the server sent us
1494         // everything->
1495         if (firstSeqNum != (sequenceNumber + 1)) {
1496
1497                 // Check the size of the slots that were sent down by the server->
1498                 // Can only check the size if there was a gap
1499                 checkNumSlots(newSlots->length());
1500
1501                 // Since there was a gap every machine must have pushed a slot or
1502                 // must have a last message message-> If not then the server is
1503                 // hiding slots
1504                 if (!machineSet->isEmpty()) {
1505                         throw new Error("Missing record for machines: ");
1506                 }
1507         }
1508
1509         // Update the size of our local block chain->
1510         commitNewMaxSize();
1511
1512         // Commit new to slots to the local block chain->
1513         {
1514                 uint numSlots = newSlots->length();
1515                 for (uint i = 0; i < numSlots; i++) {
1516                         Slot *slot = newSlots->get(i);
1517
1518                         // Insert this slot into our local block chain copy->
1519                         buffer->putSlot(slot);
1520
1521                         // Keep track of how many slots are currently live (have live data
1522                         // in them)->
1523                         liveSlotCount++;
1524                 }
1525         }
1526         // Get the sequence number of the latest slot in the system
1527         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1528         updateLiveStateFromServer();
1529
1530         // No Need to remember after we pulled from the server
1531         offlineTransactionsCommittedAndAtServer->clear();
1532
1533         // This is invalidated now
1534         hadPartialSendToServer = false;
1535 }
1536
1537 void Table::updateLiveStateFromServer() {
1538         // Process the new transaction parts
1539         processNewTransactionParts();
1540
1541         // Do arbitration on new transactions that were received
1542         arbitrateFromServer();
1543
1544         // Update all the committed keys
1545         bool didCommitOrSpeculate = updateCommittedTable();
1546
1547         // Delete the transactions that are now dead
1548         updateLiveTransactionsAndStatus();
1549
1550         // Do speculations
1551         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1552         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1553 }
1554
1555 void Table::updateLiveStateFromLocal() {
1556         // Update all the committed keys
1557         bool didCommitOrSpeculate = updateCommittedTable();
1558
1559         // Delete the transactions that are now dead
1560         updateLiveTransactionsAndStatus();
1561
1562         // Do speculations
1563         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1564         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1565 }
1566
1567 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1568         int64_t prevslots = firstSequenceNumber;
1569
1570         if (didFindTableStatus) {
1571         } else {
1572                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1573         }
1574
1575         didFindTableStatus = true;
1576         currMaxSize = numberOfSlots;
1577 }
1578
1579 void Table::updateExpectedSize() {
1580         expectedsize++;
1581
1582         if (expectedsize > currMaxSize) {
1583                 expectedsize = currMaxSize;
1584         }
1585 }
1586
1587
1588 /**
1589  * Check the size of the block chain to make sure there are enough
1590  * slots sent back by the server-> This is only called when we have a
1591  * gap between the slots that we have locally and the slots sent by
1592  * the server therefore in the slots sent by the server there will be
1593  * at least 1 Table status message
1594  */
1595 void Table::checkNumSlots(int numberOfSlots) {
1596         if (numberOfSlots != expectedsize) {
1597                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1598         }
1599 }
1600
1601 /**
1602  * Update the size of of the local buffer if it is needed->
1603  */
1604 void Table::commitNewMaxSize() {
1605         didFindTableStatus = false;
1606
1607         // Resize the local slot buffer
1608         if (numberOfSlots != currMaxSize) {
1609                 buffer->resize((int32_t)currMaxSize);
1610         }
1611
1612         // Change the number of local slots to the new size
1613         numberOfSlots = (int32_t)currMaxSize;
1614
1615         // Recalculate the resize threshold since the size of the local
1616         // buffer has changed
1617         setResizeThreshold();
1618 }
1619
1620 /**
1621  * Process the new transaction parts from this latest round of slots
1622  * received from the server
1623  */
1624 void Table::processNewTransactionParts() {
1625
1626         if (newTransactionParts->size() == 0) {
1627                 // Nothing new to process
1628                 return;
1629         }
1630
1631         // Iterate through all the machine Ids that we received new parts
1632         // for
1633         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1634         while (tpit->hasNext()) {
1635                 int64_t machineId = tpit->next();
1636                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1637
1638                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1639                 // Iterate through all the parts for that machine Id
1640                 while (ptit->hasNext()) {
1641                         Pair<int64_t, int32_t> *partId = ptit->next();
1642                         TransactionPart *part = parts->get(partId);
1643
1644                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1645                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1646                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1647                                         // Set dead the transaction part
1648                                         part->setDead();
1649                                         continue;
1650                                 }
1651                         }
1652
1653                         // Get the transaction object for that sequence number
1654                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1655
1656                         if (transaction == NULL) {
1657                                 // This is a new transaction that we dont have so make a new one
1658                                 transaction = new Transaction();
1659
1660                                 // Insert this new transaction into the live tables
1661                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1662                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1663                         }
1664
1665                         // Add that part to the transaction
1666                         transaction->addPartDecode(part);
1667                 }
1668                 delete ptit;
1669         }
1670         delete tpit;
1671         // Clear all the new transaction parts in preparation for the next
1672         // time the server sends slots
1673         {
1674                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1675                 while (partsit->hasNext()) {
1676                         int64_t machineId = partsit->next();
1677                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1678                         delete parts;
1679                 }
1680                 delete partsit;
1681                 newTransactionParts->clear();
1682         }
1683 }
1684
1685 void Table::arbitrateFromServer() {
1686         if (liveTransactionBySequenceNumberTable->size() == 0) {
1687                 // Nothing to arbitrate on so move on
1688                 return;
1689         }
1690
1691         // Get the transaction sequence numbers and sort from oldest to newest
1692         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1693         {
1694                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1695                 while (trit->hasNext())
1696                         transactionSequenceNumbers->add(trit->next());
1697                 delete trit;
1698         }
1699         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1700
1701         // Collection of key value pairs that are
1702         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1703
1704         // The last transaction arbitrated on
1705         int64_t lastTransactionCommitted = -1;
1706         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1707         uint tsnSize = transactionSequenceNumbers->size();
1708         for (uint i = 0; i < tsnSize; i++) {
1709                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1710                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1711
1712                 // Check if this machine arbitrates for this transaction if not
1713                 // then we cant arbitrate this transaction
1714                 if (transaction->getArbitrator() != localMachineId) {
1715                         continue;
1716                 }
1717
1718                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1719                         continue;
1720                 }
1721
1722                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1723                         // We have seen this already locally so dont commit again
1724                         continue;
1725                 }
1726
1727
1728                 if (!transaction->isComplete()) {
1729                         // Will arbitrate in incorrect order if we continue so just break
1730                         // Most likely this
1731                         break;
1732                 }
1733
1734
1735                 // update the largest transaction seen by arbitrator from server
1736                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1737                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1738                 } else {
1739                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1740                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1741                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1742                         }
1743                 }
1744
1745                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1746                         // Guard evaluated as true
1747
1748                         // Update the local changes so we can make the commit
1749                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1750                         while (kvit->hasNext()) {
1751                                 KeyValue *kv = kvit->next();
1752                                 speculativeTableTmp->put(kv->getKey(), kv);
1753                         }
1754                         delete kvit;
1755
1756                         // Update what the last transaction committed was for use in batch commit
1757                         lastTransactionCommitted = transactionSequenceNumber;
1758                 } else {
1759                         // Guard evaluated was false so create abort
1760                         // Create the abort
1761                         Abort *newAbort = new Abort(NULL,
1762                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1763                                                                                                                                         transaction->getSequenceNumber(),
1764                                                                                                                                         transaction->getMachineId(),
1765                                                                                                                                         transaction->getArbitrator(),
1766                                                                                                                                         localArbitrationSequenceNumber);
1767                         localArbitrationSequenceNumber++;
1768                         generatedAborts->add(newAbort);
1769
1770                         // Insert the abort so we can process
1771                         processEntry(newAbort);
1772                 }
1773
1774                 lastSeqNumArbOn = transactionSequenceNumber;
1775         }
1776
1777         Commit *newCommit = NULL;
1778
1779         // If there is something to commit
1780         if (speculativeTableTmp->size() != 0) {
1781                 // Create the commit and increment the commit sequence number
1782                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1783                 localArbitrationSequenceNumber++;
1784
1785                 // Add all the new keys to the commit
1786                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1787                 while (spit->hasNext()) {
1788                         IoTString *string = spit->next();
1789                         KeyValue *kv = speculativeTableTmp->get(string);
1790                         newCommit->addKV(kv);
1791                 }
1792                 delete spit;
1793                 
1794                 // create the commit parts
1795                 newCommit->createCommitParts();
1796
1797                 // Append all the commit parts to the end of the pending queue
1798                 // waiting for sending to the server
1799                 // Insert the commit so we can process it
1800                 Vector<CommitPart *> *parts = newCommit->getParts();
1801                 uint partsSize = parts->size();
1802                 for (uint i = 0; i < partsSize; i++) {
1803                         CommitPart *commitPart = parts->get(i);
1804                         processEntry(commitPart);
1805                 }
1806         }
1807         delete speculativeTableTmp;
1808
1809         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1810                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1811                 pendingSendArbitrationRounds->add(arbitrationRound);
1812
1813                 if (compactArbitrationData()) {
1814                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1815                         if (newArbitrationRound->getCommit() != NULL) {
1816                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1817                                 uint partsSize = parts->size();
1818                                 for (uint i = 0; i < partsSize; i++) {
1819                                         CommitPart *commitPart = parts->get(i);
1820                                         processEntry(commitPart);
1821                                 }
1822                         }
1823                 }
1824         }
1825 }
1826
1827 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1828
1829         // Check if this machine arbitrates for this transaction if not then
1830         // we cant arbitrate this transaction
1831         if (transaction->getArbitrator() != localMachineId) {
1832                 return Pair<bool, bool>(false, false);
1833         }
1834
1835         if (!transaction->isComplete()) {
1836                 // Will arbitrate in incorrect order if we continue so just break
1837                 // Most likely this
1838                 return Pair<bool, bool>(false, false);
1839         }
1840
1841         if (transaction->getMachineId() != localMachineId) {
1842                 // dont do this check for local transactions
1843                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1844                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1845                                 // We've have already seen this from the server
1846                                 return Pair<bool, bool>(false, false);
1847                         }
1848                 }
1849         }
1850
1851         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1852                 // Guard evaluated as true Create the commit and increment the
1853                 // commit sequence number
1854                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1855                 localArbitrationSequenceNumber++;
1856
1857                 // Update the local changes so we can make the commit
1858                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1859                 while (kvit->hasNext()) {
1860                         KeyValue *kv = kvit->next();
1861                         newCommit->addKV(kv);
1862                 }
1863                 delete kvit;
1864
1865                 // create the commit parts
1866                 newCommit->createCommitParts();
1867
1868                 // Append all the commit parts to the end of the pending queue
1869                 // waiting for sending to the server
1870                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1871                 pendingSendArbitrationRounds->add(arbitrationRound);
1872
1873                 if (compactArbitrationData()) {
1874                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1875                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1876                         uint partsSize = parts->size();
1877                         for (uint i = 0; i < partsSize; i++) {
1878                                 CommitPart *commitPart = parts->get(i);
1879                                 processEntry(commitPart);
1880                         }
1881                 } else {
1882                         // Insert the commit so we can process it
1883                         Vector<CommitPart *> *parts = newCommit->getParts();
1884                         uint partsSize = parts->size();
1885                         for (uint i = 0; i < partsSize; i++) {
1886                                 CommitPart *commitPart = parts->get(i);
1887                                 processEntry(commitPart);
1888                         }
1889                 }
1890
1891                 if (transaction->getMachineId() == localMachineId) {
1892                         TransactionStatus *status = transaction->getTransactionStatus();
1893                         if (status != NULL) {
1894                                 status->setStatus(TransactionStatus_StatusCommitted);
1895                         }
1896                 }
1897
1898                 updateLiveStateFromLocal();
1899                 return Pair<bool, bool>(true, true);
1900         } else {
1901                 if (transaction->getMachineId() == localMachineId) {
1902                         // For locally created messages update the status
1903                         // Guard evaluated was false so create abort
1904                         TransactionStatus *status = transaction->getTransactionStatus();
1905                         if (status != NULL) {
1906                                 status->setStatus(TransactionStatus_StatusAborted);
1907                         }
1908                 } else {
1909                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1910
1911                         // Create the abort
1912                         Abort *newAbort = new Abort(NULL,
1913                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1914                                                                                                                                         -1,
1915                                                                                                                                         transaction->getMachineId(),
1916                                                                                                                                         transaction->getArbitrator(),
1917                                                                                                                                         localArbitrationSequenceNumber);
1918                         localArbitrationSequenceNumber++;
1919                         addAbortSet->add(newAbort);
1920
1921                         // Append all the commit parts to the end of the pending queue
1922                         // waiting for sending to the server
1923                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1924                         pendingSendArbitrationRounds->add(arbitrationRound);
1925
1926                         if (compactArbitrationData()) {
1927                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1928
1929                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1930                                 uint partsSize = parts->size();
1931                                 for (uint i = 0; i < partsSize; i++) {
1932                                         CommitPart *commitPart = parts->get(i);
1933                                         processEntry(commitPart);
1934                                 }
1935                         }
1936                 }
1937
1938                 updateLiveStateFromLocal();
1939                 return Pair<bool, bool>(true, false);
1940         }
1941 }
1942
1943 /**
1944  * Compacts the arbitration data my merging commits and aggregating
1945  * aborts so that a single large push of commits can be done instead
1946  * of many small updates
1947  */
1948 bool Table::compactArbitrationData() {
1949         if (pendingSendArbitrationRounds->size() < 2) {
1950                 // Nothing to compact so do nothing
1951                 return false;
1952         }
1953
1954         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1955         if (lastRound->getDidSendPart()) {
1956                 return false;
1957         }
1958
1959         bool hadCommit = (lastRound->getCommit() == NULL);
1960         bool gotNewCommit = false;
1961
1962         uint numberToDelete = 1;
1963         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1964                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1965
1966                 if (round->isFull() || round->getDidSendPart()) {
1967                         // Stop since there is a part that cannot be compacted and we
1968                         // need to compact in order
1969                         break;
1970                 }
1971
1972                 if (round->getCommit() == NULL) {
1973                         // Try compacting aborts only
1974                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1975                         if (newSize > ArbitrationRound_MAX_PARTS) {
1976                                 // Cant compact since it would be too large
1977                                 break;
1978                         }
1979                         lastRound->addAborts(round->getAborts());
1980                 } else {
1981                         // Create a new larger commit
1982                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1983                         localArbitrationSequenceNumber++;
1984
1985                         // Create the commit parts so that we can count them
1986                         newCommit->createCommitParts();
1987
1988                         // Calculate the new size of the parts
1989                         int newSize = newCommit->getNumberOfParts();
1990                         newSize += lastRound->getAbortsCount();
1991                         newSize += round->getAbortsCount();
1992
1993                         if (newSize > ArbitrationRound_MAX_PARTS) {
1994                                 // Cant compact since it would be too large
1995                                 break;
1996                         }
1997
1998                         // Set the new compacted part
1999                         lastRound->setCommit(newCommit);
2000                         lastRound->addAborts(round->getAborts());
2001                         gotNewCommit = true;
2002                 }
2003
2004                 numberToDelete++;
2005         }
2006
2007         if (numberToDelete != 1) {
2008                 // If there is a compaction
2009                 // Delete the previous pieces that are now in the new compacted piece
2010                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2011                         pendingSendArbitrationRounds->clear();
2012                 } else {
2013                         for (uint i = 0; i < numberToDelete; i++) {
2014                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2015                         }
2016                 }
2017
2018                 // Add the new compacted into the pending to send list
2019                 pendingSendArbitrationRounds->add(lastRound);
2020
2021                 // Should reinsert into the commit processor
2022                 if (hadCommit && gotNewCommit) {
2023                         return true;
2024                 }
2025         }
2026
2027         return false;
2028 }
2029
2030 /**
2031  * Update all the commits and the committed tables, sets dead the dead
2032  * transactions
2033  */
2034 bool Table::updateCommittedTable() {
2035         if (newCommitParts->size() == 0) {
2036                 // Nothing new to process
2037                 return false;
2038         }
2039
2040         // Iterate through all the machine Ids that we received new parts for
2041         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2042         while (partsit->hasNext()) {
2043                 int64_t machineId = partsit->next();
2044                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2045
2046                 // Iterate through all the parts for that machine Id
2047                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2048                 while (pairit->hasNext()) {
2049                         Pair<int64_t, int32_t> *partId = pairit->next();
2050                         CommitPart *part = parts->get(partId);
2051
2052                         // Get the transaction object for that sequence number
2053                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2054
2055                         if (commitForClientTable == NULL) {
2056                                 // This is the first commit from this device
2057                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2058                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2059                         }
2060
2061                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2062
2063                         if (commit == NULL) {
2064                                 // This is a new commit that we dont have so make a new one
2065                                 commit = new Commit();
2066
2067                                 // Insert this new commit into the live tables
2068                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2069                         }
2070
2071                         // Add that part to the commit
2072                         commit->addPartDecode(part);
2073                 }
2074                 delete pairit;
2075                 delete parts;
2076         }
2077         delete partsit;
2078
2079         // Clear all the new commits parts in preparation for the next time
2080         // the server sends slots
2081         newCommitParts->clear();
2082
2083         // If we process a new commit keep track of it for future use
2084         bool didProcessANewCommit = false;
2085
2086         // Process the commits one by one
2087         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2088         while (liveit->hasNext()) {
2089                 int64_t arbitratorId = liveit->next();
2090
2091                 // Get all the commits for a specific arbitrator
2092                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2093
2094                 // Sort the commits in order
2095                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2096                 {
2097                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2098                         while (clientit->hasNext())
2099                                 commitSequenceNumbers->add(clientit->next());
2100                         delete clientit;
2101                 }
2102
2103                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2104
2105                 // Get the last commit seen from this arbitrator
2106                 int64_t lastCommitSeenSequenceNumber = -1;
2107                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2108                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2109                 }
2110
2111                 // Go through each new commit one by one
2112                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2113                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2114                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2115
2116                         // Special processing if a commit is not complete
2117                         if (!commit->isComplete()) {
2118                                 if (i == (commitSequenceNumbers->size() - 1)) {
2119                                         // If there is an incomplete commit and this commit is the
2120                                         // latest one seen then this commit cannot be processed and
2121                                         // there are no other commits
2122                                         break;
2123                                 } else {
2124                                         // This is a commit that was already dead but parts of it
2125                                         // are still in the block chain (not flushed out yet)->
2126                                         // Delete it and move on
2127                                         commit->setDead();
2128                                         commitForClientTable->remove(commit->getSequenceNumber());
2129                                         delete commit;
2130                                         continue;
2131                                 }
2132                         }
2133
2134                         // Update the last transaction that was updated if we can
2135                         if (commit->getTransactionSequenceNumber() != -1) {
2136                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2137                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2138                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2139                                 }
2140                         }
2141
2142                         // Update the last arbitration data that we have seen so far
2143                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2144                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2145                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2146                                         // Is larger
2147                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2148                                 }
2149                         } else {
2150                                 // Never seen any data from this arbitrator so record the first one
2151                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2152                         }
2153
2154                         // We have already seen this commit before so need to do the
2155                         // full processing on this commit
2156                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2157
2158                                 // Update the last transaction that was updated if we can
2159                                 if (commit->getTransactionSequenceNumber() != -1) {
2160                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2161                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2162                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2163                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2164                                         }
2165                                 }
2166
2167                                 continue;
2168                         }
2169
2170                         // If we got here then this is a brand new commit and needs full
2171                         // processing
2172                         // Get what commits should be edited, these are the commits that
2173                         // have live values for their keys
2174                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2175                         {
2176                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2177                                 while (kvit->hasNext()) {
2178                                         KeyValue *kv = kvit->next();
2179                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2180                                         if (commit != NULL)
2181                                                 commitsToEdit->add(commit);
2182                                 }
2183                                 delete kvit;
2184                         }
2185
2186                         // Update each previous commit that needs to be updated
2187                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2188                         while (commitit->hasNext()) {
2189                                 Commit *previousCommit = commitit->next();
2190
2191                                 // Only bother with live commits (TODO: Maybe remove this check)
2192                                 if (previousCommit->isLive()) {
2193
2194                                         // Update which keys in the old commits are still live
2195                                         {
2196                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2197                                                 while (kvit->hasNext()) {
2198                                                         KeyValue *kv = kvit->next();
2199                                                         previousCommit->invalidateKey(kv->getKey());
2200                                                 }
2201                                                 delete kvit;
2202                                         }
2203
2204                                         // if the commit is now dead then remove it
2205                                         if (!previousCommit->isLive()) {
2206                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2207                                                 delete previousCommit;
2208                                         }
2209                                 }
2210                         }
2211                         delete commitit;
2212
2213                         // Update the last seen sequence number from this arbitrator
2214                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2215                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2216                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2217                                 }
2218                         } else {
2219                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2220                         }
2221
2222                         // We processed a new commit that we havent seen before
2223                         didProcessANewCommit = true;
2224
2225                         // Update the committed table of keys and which commit is using which key
2226                         {
2227                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2228                                 while (kvit->hasNext()) {
2229                                         KeyValue *kv = kvit->next();
2230                                         committedKeyValueTable->put(kv->getKey(), kv);
2231                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2232                                 }
2233                                 delete kvit;
2234                         }
2235                 }
2236         }
2237         delete liveit;
2238
2239         return didProcessANewCommit;
2240 }
2241
2242 /**
2243  * Create the speculative table from transactions that are still live
2244  * and have come from the cloud
2245  */
2246 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2247         if (liveTransactionBySequenceNumberTable->size() == 0) {
2248                 // There is nothing to speculate on
2249                 return false;
2250         }
2251
2252         // Create a list of the transaction sequence numbers and sort them
2253         // from oldest to newest
2254         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2255         {
2256                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2257                 while (trit->hasNext())
2258                         transactionSequenceNumbersSorted->add(trit->next());
2259                 delete trit;
2260         }
2261
2262         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2263
2264         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2265
2266
2267         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2268                 // If there is a gap in the transaction sequence numbers then
2269                 // there was a commit or an abort of a transaction OR there was a
2270                 // new commit (Could be from offline commit) so a redo the
2271                 // speculation from scratch
2272
2273                 // Start from scratch
2274                 speculatedKeyValueTable->clear();
2275                 lastTransactionSequenceNumberSpeculatedOn = -1;
2276                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2277         }
2278
2279         // Remember the front of the transaction list
2280         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2281
2282         // Find where to start arbitration from
2283         uint startIndex = 0;
2284
2285         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2286                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2287                         break;
2288         startIndex++;
2289
2290         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2291                 // Make sure we are not out of bounds
2292                 return false;           // did not speculate
2293         }
2294
2295         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2296         bool didSkip = true;
2297
2298         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2299                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2300                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2301
2302                 if (!transaction->isComplete()) {
2303                         // If there is an incomplete transaction then there is nothing
2304                         // we can do add this transactions arbitrator to the list of
2305                         // arbitrators we should ignore
2306                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2307                         didSkip = true;
2308                         continue;
2309                 }
2310
2311                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2312                         continue;
2313                 }
2314
2315                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2316
2317                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2318                         // Guard evaluated to true so update the speculative table
2319                         {
2320                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2321                                 while (kvit->hasNext()) {
2322                                         KeyValue *kv = kvit->next();
2323                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2324                                 }
2325                                 delete kvit;
2326                         }
2327                 }
2328         }
2329
2330         if (didSkip) {
2331                 // Since there was a skip we need to redo the speculation next time around
2332                 lastTransactionSequenceNumberSpeculatedOn = -1;
2333                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2334         }
2335
2336         // We did some speculation
2337         return true;
2338 }
2339
2340 /**
2341  * Create the pending transaction speculative table from transactions
2342  * that are still in the pending transaction buffer
2343  */
2344 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2345         if (pendingTransactionQueue->size() == 0) {
2346                 // There is nothing to speculate on
2347                 return;
2348         }
2349
2350         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2351                 // need to reset on the pending speculation
2352                 lastPendingTransactionSpeculatedOn = NULL;
2353                 firstPendingTransaction = pendingTransactionQueue->get(0);
2354                 pendingTransactionSpeculatedKeyValueTable->clear();
2355         }
2356
2357         // Find where to start arbitration from
2358         uint startIndex = 0;
2359
2360         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2361                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2362                         break;
2363
2364         if (startIndex >= pendingTransactionQueue->size()) {
2365                 // Make sure we are not out of bounds
2366                 return;
2367         }
2368
2369         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2370                 Transaction *transaction = pendingTransactionQueue->get(i);
2371
2372                 lastPendingTransactionSpeculatedOn = transaction;
2373
2374                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2375                         // Guard evaluated to true so update the speculative table
2376                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2377                         while (kvit->hasNext()) {
2378                                 KeyValue *kv = kvit->next();
2379                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2380                         }
2381                         delete kvit;
2382                 }
2383         }
2384 }
2385
2386 /**
2387  * Set dead and remove from the live transaction tables the
2388  * transactions that are dead
2389  */
2390 void Table::updateLiveTransactionsAndStatus() {
2391         // Go through each of the transactions
2392         {
2393                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2394                 while (iter->hasNext()) {
2395                         int64_t key = iter->next();
2396                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2397
2398                         // Check if the transaction is dead
2399                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2400                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2401                                 // Set dead the transaction
2402                                 transaction->setDead();
2403
2404                                 // Remove the transaction from the live table
2405                                 iter->remove();
2406                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2407                                 delete transaction;
2408                         }
2409                 }
2410                 delete iter;
2411         }
2412
2413         // Go through each of the transactions
2414         {
2415                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2416                 while (iter->hasNext()) {
2417                         int64_t key = iter->next();
2418                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2419
2420                         // Check if the transaction is dead
2421                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2422                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2423                                 // Set committed
2424                                 status->setStatus(TransactionStatus_StatusCommitted);
2425
2426                                 // Remove
2427                                 iter->remove();
2428                         }
2429                 }
2430                 delete iter;
2431         }
2432 }
2433
2434 /**
2435  * Process this slot, entry by entry->  Also update the latest message sent by slot
2436  */
2437 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2438
2439         // Update the last message seen
2440         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2441
2442         // Process each entry in the slot
2443         Vector<Entry *> *entries = slot->getEntries();
2444         uint eSize = entries->size();
2445         for (uint ei = 0; ei < eSize; ei++) {
2446                 Entry *entry = entries->get(ei);
2447                 switch (entry->getType()) {
2448                 case TypeCommitPart:
2449                         processEntry((CommitPart *)entry);
2450                         break;
2451                 case TypeAbort:
2452                         processEntry((Abort *)entry);
2453                         break;
2454                 case TypeTransactionPart:
2455                         processEntry((TransactionPart *)entry);
2456                         break;
2457                 case TypeNewKey:
2458                         processEntry((NewKey *)entry);
2459                         break;
2460                 case TypeLastMessage:
2461                         processEntry((LastMessage *)entry, machineSet);
2462                         break;
2463                 case TypeRejectedMessage:
2464                         processEntry((RejectedMessage *)entry, indexer);
2465                         break;
2466                 case TypeTableStatus:
2467                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2468                         break;
2469                 default:
2470                         throw new Error("Unrecognized type: ");
2471                 }
2472         }
2473 }
2474
2475 /**
2476  * Update the last message that was sent for a machine Id
2477  */
2478 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2479         // Update what the last message received by a machine was
2480         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2481 }
2482
2483 /**
2484  * Add the new key to the arbitrators table and update the set of live
2485  * new keys (in case of a rescued new key message)
2486  */
2487 void Table::processEntry(NewKey *entry) {
2488         // Update the arbitrator table with the new key information
2489         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2490
2491         // Update what the latest live new key is
2492         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2493         if (oldNewKey != NULL) {
2494                 // Delete the old new key messages
2495                 oldNewKey->setDead();
2496         }
2497 }
2498
2499 /**
2500  * Process new table status entries and set dead the old ones as new
2501  * ones come in-> keeps track of the largest and smallest table status
2502  * seen in this current round of updating the local copy of the block
2503  * chain
2504  */
2505 void Table::processEntry(TableStatus *entry, int64_t seq) {
2506         int newNumSlots = entry->getMaxSlots();
2507         updateCurrMaxSize(newNumSlots);
2508         initExpectedSize(seq, newNumSlots);
2509
2510         if (liveTableStatus != NULL) {
2511                 // We have a larger table status so the old table status is no
2512                 // int64_ter alive
2513                 liveTableStatus->setDead();
2514         }
2515
2516         // Make this new table status the latest alive table status
2517         liveTableStatus = entry;
2518 }
2519
2520 /**
2521  * Check old messages to see if there is a block chain violation->
2522  * Also
2523  */
2524 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2525         int64_t oldSeqNum = entry->getOldSeqNum();
2526         int64_t newSeqNum = entry->getNewSeqNum();
2527         bool isequal = entry->getEqual();
2528         int64_t machineId = entry->getMachineID();
2529         int64_t seq = entry->getSequenceNumber();
2530
2531         // Check if we have messages that were supposed to be rejected in
2532         // our local block chain
2533         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2534                 // Get the slot
2535                 Slot *slot = indexer->getSlot(seqNum);
2536
2537                 if (slot != NULL) {
2538                         // If we have this slot make sure that it was not supposed to be
2539                         // a rejected slot
2540                         int64_t slotMachineId = slot->getMachineID();
2541                         if (isequal != (slotMachineId == machineId)) {
2542                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2543                         }
2544                 }
2545         }
2546
2547         // Create a list of clients to watch until they see this rejected
2548         // message entry->
2549         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2550         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2551         while (iter->hasNext()) {
2552                 // Machine ID for the last message entry
2553                 int64_t lastMessageEntryMachineId = iter->next();
2554
2555                 // We've seen it, don't need to continue to watch->  Our next
2556                 // message will implicitly acknowledge it->
2557                 if (lastMessageEntryMachineId == localMachineId) {
2558                         continue;
2559                 }
2560
2561                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2562                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2563
2564                 if (entrySequenceNumber < seq) {
2565                         // Add this rejected message to the set of messages that this
2566                         // machine ID did not see yet
2567                         addWatchVector(lastMessageEntryMachineId, entry);
2568                         // This client did not see this rejected message yet so add it
2569                         // to the watch set to monitor
2570                         deviceWatchSet->add(lastMessageEntryMachineId);
2571                 }
2572         }
2573         delete iter;
2574
2575         if (deviceWatchSet->isEmpty()) {
2576                 // This rejected message has been seen by all the clients so
2577                 entry->setDead();
2578         } else {
2579                 // We need to watch this rejected message
2580                 entry->setWatchSet(deviceWatchSet);
2581         }
2582 }
2583
2584 /**
2585  * Check if this abort is live, if not then save it so we can kill it
2586  * later-> update the last transaction number that was arbitrated on->
2587  */
2588 void Table::processEntry(Abort *entry) {
2589         if (entry->getTransactionSequenceNumber() != -1) {
2590                 // update the transaction status if it was sent to the server
2591                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2592                 if (status != NULL) {
2593                         status->setStatus(TransactionStatus_StatusAborted);
2594                 }
2595         }
2596
2597         // Abort has not been seen by the client it is for yet so we need to
2598         // keep track of it
2599
2600         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2601         if (previouslySeenAbort != NULL) {
2602                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2603         }
2604
2605         if (entry->getTransactionArbitrator() == localMachineId) {
2606                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2607         }
2608
2609         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2610                 // The machine already saw this so it is dead
2611                 entry->setDead();
2612                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2613                 liveAbortTable->remove(&abortid);
2614
2615                 if (entry->getTransactionArbitrator() == localMachineId) {
2616                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2617                 }
2618                 return;
2619         }
2620
2621         // Update the last arbitration data that we have seen so far
2622         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2623                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2624                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2625                         // Is larger
2626                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2627                 }
2628         } else {
2629                 // Never seen any data from this arbitrator so record the first one
2630                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2631         }
2632
2633         // Set dead a transaction if we can
2634         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2635
2636         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2637         if (transactionToSetDead != NULL) {
2638                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2639         }
2640
2641         // Update the last transaction sequence number that the arbitrator
2642         // arbitrated on
2643         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2644                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2645                 // Is a valid one
2646                 if (entry->getTransactionSequenceNumber() != -1) {
2647                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2648                 }
2649         }
2650 }
2651
2652 /**
2653  * Set dead the transaction part if that transaction is dead and keep
2654  * track of all new parts
2655  */
2656 void Table::processEntry(TransactionPart *entry) {
2657         // Check if we have already seen this transaction and set it dead OR
2658         // if it is not alive
2659         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2660                 // This transaction is dead, it was already committed or aborted
2661                 entry->setDead();
2662                 return;
2663         }
2664
2665         // This part is still alive
2666         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2667
2668         if (transactionPart == NULL) {
2669                 // Dont have a table for this machine Id yet so make one
2670                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2671                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2672         }
2673
2674         // Update the part and set dead ones we have already seen (got a
2675         // rescued version)
2676         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2677         if (previouslySeenPart != NULL) {
2678                 previouslySeenPart->setDead();
2679         }
2680 }
2681
2682 /**
2683  * Process new commit entries and save them for future use->  Delete duplicates
2684  */
2685 void Table::processEntry(CommitPart *entry) {
2686         // Update the last transaction that was updated if we can
2687         if (entry->getTransactionSequenceNumber() != -1) {
2688                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2689                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2690                 }
2691         }
2692
2693         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2694         if (commitPart == NULL) {
2695                 // Don't have a table for this machine Id yet so make one
2696                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2697                 newCommitParts->put(entry->getMachineId(), commitPart);
2698         }
2699         // Update the part and set dead ones we have already seen (got a
2700         // rescued version)
2701         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2702         if (previouslySeenPart != NULL) {
2703                 previouslySeenPart->setDead();
2704         }
2705 }
2706
2707 /**
2708  * Update the last message seen table-> Update and set dead the
2709  * appropriate RejectedMessages as clients see them-> Updates the live
2710  * aborts, removes those that are dead and sets them dead-> Check that
2711  * the last message seen is correct and that there is no mismatch of
2712  * our own last message or that other clients have not had a rollback
2713  * on the last message->
2714  */
2715 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2716         // We have seen this machine ID
2717         machineSet->remove(machineId);
2718
2719         // Get the set of rejected messages that this machine Id is has not seen yet
2720         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2721         // If there is a rejected message that this machine Id has not seen yet
2722         if (watchset != NULL) {
2723                 // Go through each rejected message that this machine Id has not
2724                 // seen yet
2725
2726                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2727                 while (rmit->hasNext()) {
2728                         RejectedMessage *rm = rmit->next();
2729                         // If this machine Id has seen this rejected message->->->
2730                         if (rm->getSequenceNumber() <= seqNum) {
2731                                 // Remove it from our watchlist
2732                                 rmit->remove();
2733                                 // Decrement machines that need to see this notification
2734                                 rm->removeWatcher(machineId);
2735                                 delete rm;
2736                         }
2737                 }
2738                 delete rmit;
2739         }
2740
2741         // Set dead the abort
2742         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2743
2744         while (abortit->hasNext()) {
2745                 Pair<int64_t, int64_t> *key = abortit->next();
2746                 Abort *abort = liveAbortTable->get(key);
2747                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2748                         abort->setDead();
2749                         abortit->remove();
2750                         if (abort->getTransactionArbitrator() == localMachineId) {
2751                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2752                         }
2753                 }
2754         }
2755         delete abortit;
2756         if (machineId == localMachineId) {
2757                 // Our own messages are immediately dead->
2758                 char livenessType = liveness->getType();
2759                 if (livenessType == TypeLastMessage) {
2760                         ((LastMessage *)liveness)->setDead();
2761                 } else if (livenessType == TypeSlot) {
2762                         ((Slot *)liveness)->setDead();
2763                 } else {
2764                         throw new Error("Unrecognized type");
2765                 }
2766         }
2767         // Get the old last message for this device
2768         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2769         if (lastMessageEntry == NULL) {
2770                 // If no last message then there is nothing else to process
2771                 return;
2772         }
2773
2774         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2775         Liveness *lastEntry = lastMessageEntry->getSecond();
2776         delete lastMessageEntry;
2777
2778         // If it is not our machine Id since we already set ours to dead
2779         if (machineId != localMachineId) {
2780                 char lastEntryType = lastEntry->getType();
2781
2782                 if (lastEntryType == TypeLastMessage) {
2783                         ((LastMessage *)lastEntry)->setDead();
2784                 } else if (lastEntryType == TypeSlot) {
2785                         ((Slot *)lastEntry)->setDead();
2786                 } else {
2787                         throw new Error("Unrecognized type");
2788                 }
2789         }
2790         // Make sure the server is not playing any games
2791         if (machineId == localMachineId) {
2792                 if (hadPartialSendToServer) {
2793                         // We were not making any updates and we had a machine mismatch
2794                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2795                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2796                         }
2797                 } else {
2798                         // We were not making any updates and we had a machine mismatch
2799                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2800                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2801                         }
2802                 }
2803         } else {
2804                 if (lastMessageSeqNum > seqNum) {
2805                         throw new Error("Server Error: Rollback on remote machine sequence number");
2806                 }
2807         }
2808 }
2809
2810 /**
2811  * Add a rejected message entry to the watch set to keep track of
2812  * which clients have seen that rejected message entry and which have
2813  * not.
2814  */
2815 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2816         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2817         if (entries == NULL) {
2818                 // There is no set for this machine ID yet so create one
2819                 entries = new Hashset<RejectedMessage *>();
2820                 rejectedMessageWatchVectorTable->put(machineId, entries);
2821         }
2822         entries->add(entry);
2823 }
2824
2825 /**
2826  * Check if the HMAC chain is not violated
2827  */
2828 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2829         for (uint i = 0; i < newSlots->length(); i++) {
2830                 Slot *currSlot = newSlots->get(i);
2831                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2832                 if (prevSlot != NULL &&
2833                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2834                         throw new Error("Server Error: Invalid HMAC Chain");
2835         }
2836 }