space hacks
[iotcloud.git] / version2 / src / C / Table.cpp
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(1),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(1),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastNewKey(NULL),
64         committedKeyValueTable(NULL),
65         speculatedKeyValueTable(NULL),
66         pendingTransactionSpeculatedKeyValueTable(NULL),
67         liveNewKeyTable(NULL),
68         lastMessageTable(NULL),
69         rejectedMessageWatchVectorTable(NULL),
70         arbitratorTable(NULL),
71         liveAbortTable(NULL),
72         newTransactionParts(NULL),
73         newCommitParts(NULL),
74         lastArbitratedTransactionNumberByArbitratorTable(NULL),
75         liveTransactionBySequenceNumberTable(NULL),
76         liveTransactionByTransactionIdTable(NULL),
77         liveCommitsTable(NULL),
78         liveCommitsByKeyTable(NULL),
79         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80         rejectedSlotVector(NULL),
81         pendingTransactionQueue(NULL),
82         pendingSendArbitrationRounds(NULL),
83         pendingSendArbitrationEntriesToDelete(NULL),
84         transactionPartsSent(NULL),
85         outstandingTransactionStatus(NULL),
86         liveAbortsGeneratedByLocal(NULL),
87         offlineTransactionsCommittedAndAtServer(NULL),
88         localCommunicationTable(NULL),
89         lastTransactionSeenFromMachineFromServer(NULL),
90         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91         lastInsertedNewKey(false),
92         lastSeqNumArbOn(0)
93 {
94         init();
95 }
96
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
98         buffer(NULL),
99         cloud(_cloud),
100         random(NULL),
101         liveTableStatus(NULL),
102         pendingTransactionBuilder(NULL),
103         lastPendingTransactionSpeculatedOn(NULL),
104         firstPendingTransaction(NULL),
105         numberOfSlots(0),
106         bufferResizeThreshold(0),
107         liveSlotCount(0),
108         oldestLiveSlotSequenceNumver(1),
109         localMachineId(_localMachineId),
110         sequenceNumber(0),
111         localSequenceNumber(0),
112         localTransactionSequenceNumber(1),
113         lastTransactionSequenceNumberSpeculatedOn(0),
114         oldestTransactionSequenceNumberSpeculatedOn(0),
115         localArbitrationSequenceNumber(1),
116         hadPartialSendToServer(false),
117         attemptedToSendToServer(false),
118         expectedsize(0),
119         didFindTableStatus(false),
120         currMaxSize(0),
121         lastSlotAttemptedToSend(NULL),
122         lastIsNewKey(false),
123         lastNewSize(0),
124         lastTransactionPartsSent(NULL),
125         lastNewKey(NULL),
126         committedKeyValueTable(NULL),
127         speculatedKeyValueTable(NULL),
128         pendingTransactionSpeculatedKeyValueTable(NULL),
129         liveNewKeyTable(NULL),
130         lastMessageTable(NULL),
131         rejectedMessageWatchVectorTable(NULL),
132         arbitratorTable(NULL),
133         liveAbortTable(NULL),
134         newTransactionParts(NULL),
135         newCommitParts(NULL),
136         lastArbitratedTransactionNumberByArbitratorTable(NULL),
137         liveTransactionBySequenceNumberTable(NULL),
138         liveTransactionByTransactionIdTable(NULL),
139         liveCommitsTable(NULL),
140         liveCommitsByKeyTable(NULL),
141         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142         rejectedSlotVector(NULL),
143         pendingTransactionQueue(NULL),
144         pendingSendArbitrationRounds(NULL),
145         pendingSendArbitrationEntriesToDelete(NULL),
146         transactionPartsSent(NULL),
147         outstandingTransactionStatus(NULL),
148         liveAbortsGeneratedByLocal(NULL),
149         offlineTransactionsCommittedAndAtServer(NULL),
150         localCommunicationTable(NULL),
151         lastTransactionSeenFromMachineFromServer(NULL),
152         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153         lastInsertedNewKey(false),
154         lastSeqNumArbOn(0)
155 {
156         init();
157 }
158
159 Table::~Table() {
160         delete cloud;
161         delete random;
162         delete buffer;
163         // init data structs
164         delete committedKeyValueTable;
165         delete speculatedKeyValueTable;
166         delete pendingTransactionSpeculatedKeyValueTable;
167         delete liveNewKeyTable;
168         {
169                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170                 while (lmit->hasNext()) {
171                         Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
172                         delete pair;
173                 }
174                 delete lmit;
175                 delete lastMessageTable;
176         }
177         if (pendingTransactionBuilder != NULL)
178                 delete pendingTransactionBuilder;
179         {
180                 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
181                 while(rmit->hasNext()) {
182                         int64_t machineid = rmit->next();
183                         Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
184                         SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185                         while (mit->hasNext()) {
186                                 RejectedMessage * rm = mit->next();
187                                 delete rm;
188                         }
189                         delete mit;
190                         delete rmset;
191                 }
192                 delete rmit;
193                 delete rejectedMessageWatchVectorTable;
194         }
195         delete arbitratorTable;
196         delete liveAbortTable;
197         {
198                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199                 while (partsit->hasNext()) {
200                         int64_t machineId = partsit->next();
201                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
202                         SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
203                         while(pit->hasNext()) {
204                                 Pair<int64_t, int32_t> * pair=pit->next();
205                                 pit->currVal()->releaseRef();
206                         }
207                         delete pit;
208                         
209                         delete parts;
210                 }
211                 delete partsit;
212                 delete newTransactionParts;
213         }
214         {
215                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
216                 while (partsit->hasNext()) {
217                         int64_t machineId = partsit->next();
218                         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
219                         SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
220                         while(pit->hasNext()) {
221                                 Pair<int64_t, int32_t> * pair=pit->next();
222                                 pit->currVal()->releaseRef();
223                         }
224                         delete pit;
225                         delete parts;
226                 }
227                 delete partsit;
228                 delete newCommitParts;
229         }
230         delete lastArbitratedTransactionNumberByArbitratorTable;
231         delete liveTransactionBySequenceNumberTable;
232         delete liveTransactionByTransactionIdTable;
233         {
234                 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
235                 while (liveit->hasNext()) {
236                         int64_t arbitratorId = liveit->next();
237                         
238                         // Get all the commits for a specific arbitrator
239                         Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
240                         {
241                                 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
242                                 while (clientit->hasNext()) {
243                                         int64_t id = clientit->next();
244                                         delete commitForClientTable->get(id);
245                                 }
246                                 delete clientit;
247                         }
248                         
249                         delete commitForClientTable;
250                 }
251                 delete liveit;
252                 delete liveCommitsTable;
253         }
254         delete liveCommitsByKeyTable;
255         delete lastCommitSeenSequenceNumberByArbitratorTable;
256         delete rejectedSlotVector;
257         {
258                 uint size = pendingTransactionQueue->size();
259                 for (uint iter = 0; iter < size; iter++) {
260                         delete pendingTransactionQueue->get(iter);
261                 }
262                 delete pendingTransactionQueue;
263         }
264         delete pendingSendArbitrationEntriesToDelete;
265         {
266                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
267                 while (trit->hasNext()) {
268                         Transaction *transaction = trit->next();
269                         delete trit->currVal();
270                 }
271                 delete trit;
272                 delete transactionPartsSent;
273         }
274         delete outstandingTransactionStatus;
275         delete liveAbortsGeneratedByLocal;
276         delete offlineTransactionsCommittedAndAtServer;
277         delete localCommunicationTable;
278         delete lastTransactionSeenFromMachineFromServer;
279         {
280                 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
281                         delete pendingSendArbitrationRounds->get(i);
282                 }
283                 delete pendingSendArbitrationRounds;
284         }
285         if (lastTransactionPartsSent != NULL)
286                 delete lastTransactionPartsSent;
287         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
288         if (lastNewKey)
289                 delete lastNewKey;
290 }
291
292 /**
293  * Init all the stuff needed for for table usage
294  */
295 void Table::init() {
296         // Init helper objects
297         random = new SecureRandom();
298         buffer = new SlotBuffer();
299
300         // init data structs
301         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
302         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
303         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
304         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
305         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
306         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
307         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
308         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
309         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
310         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
311         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
312         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
313         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
314         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
315         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
316         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
317         rejectedSlotVector = new Vector<int64_t>();
318         pendingTransactionQueue = new Vector<Transaction *>();
319         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
320         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
321         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
322         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
323         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
324         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
325         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
326         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
327         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
328
329         // Other init stuff
330         numberOfSlots = buffer->capacity();
331         setResizeThreshold();
332 }
333
334 /**
335  * Initialize the table by inserting a table status as the first entry
336  * into the table status also initialize the crypto stuff.
337  */
338 void Table::initTable() {
339         cloud->initSecurity();
340
341         // Create the first insertion into the block chain which is the table status
342         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
343         localSequenceNumber++;
344         TableStatus *status = new TableStatus(s, numberOfSlots);
345         s->addShallowEntry(status);
346         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
347
348         if (array == NULL) {
349                 array = new Array<Slot *>(1);
350                 array->set(0, s);
351                 // update local block chain
352                 validateAndUpdate(array, true);
353                 delete array;
354         } else if (array->length() == 1) {
355                 // in case we did push the slot BUT we failed to init it
356                 validateAndUpdate(array, true);
357                 delete s;
358                 delete array;
359         } else {
360                 delete s;
361                 delete array;
362                 throw new Error("Error on initialization");
363         }
364 }
365
366 /**
367  * Rebuild the table from scratch by pulling the latest block chain
368  * from the server.
369  */
370 void Table::rebuild() {
371         // Just pull the latest slots from the server
372         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
373         validateAndUpdate(newslots, true);
374         delete newslots;
375         sendToServer(NULL);
376         updateLiveTransactionsAndStatus();
377 }
378
379 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
380         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
381 }
382
383 int64_t Table::getArbitrator(IoTString *key) {
384         return arbitratorTable->get(key);
385 }
386
387 void Table::close() {
388         cloud->closeCloud();
389 }
390
391 IoTString *Table::getCommitted(IoTString *key)  {
392         KeyValue *kv = committedKeyValueTable->get(key);
393
394         if (kv != NULL) {
395                 return kv->getValue()->acquireRef();
396         } else {
397                 return NULL;
398         }
399 }
400
401 IoTString *Table::getSpeculative(IoTString *key) {
402         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
403
404         if (kv == NULL) {
405                 kv = speculatedKeyValueTable->get(key);
406         }
407
408         if (kv == NULL) {
409                 kv = committedKeyValueTable->get(key);
410         }
411
412         if (kv != NULL) {
413                 return kv->getValue()->acquireRef();
414         } else {
415                 return NULL;
416         }
417 }
418
419 IoTString *Table::getCommittedAtomic(IoTString *key) {
420         KeyValue *kv = committedKeyValueTable->get(key);
421
422         if (!arbitratorTable->contains(key)) {
423                 throw new Error("Key not Found.");
424         }
425
426         // Make sure new key value pair matches the current arbitrator
427         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
428                 // TODO: Maybe not throw en error
429                 throw new Error("Not all Key Values Match Arbitrator.");
430         }
431
432         if (kv != NULL) {
433                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
434                 return kv->getValue()->acquireRef();
435         } else {
436                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
437                 return NULL;
438         }
439 }
440
441 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
442         if (!arbitratorTable->contains(key)) {
443                 throw new Error("Key not Found.");
444         }
445
446         // Make sure new key value pair matches the current arbitrator
447         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
448                 // TODO: Maybe not throw en error
449                 throw new Error("Not all Key Values Match Arbitrator.");
450         }
451
452         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
453
454         if (kv == NULL) {
455                 kv = speculatedKeyValueTable->get(key);
456         }
457
458         if (kv == NULL) {
459                 kv = committedKeyValueTable->get(key);
460         }
461
462         if (kv != NULL) {
463                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
464                 return kv->getValue()->acquireRef();
465         } else {
466                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
467                 return NULL;
468         }
469 }
470
471 bool Table::update()  {
472         try {
473                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
474                 validateAndUpdate(newSlots, false);
475                 delete newSlots;
476                 sendToServer(NULL);
477                 updateLiveTransactionsAndStatus();
478                 return true;
479         } catch (Exception *e) {
480                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
481                 while (kit->hasNext()) {
482                         int64_t m = kit->next();
483                         updateFromLocal(m);
484                 }
485                 delete kit;
486         }
487
488         return false;
489 }
490
491 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
492         while (true) {
493                 if (arbitratorTable->contains(keyName)) {
494                         // There is already an arbitrator
495                         return false;
496                 }
497                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
498
499                 if (sendToServer(newKey)) {
500                         // If successfully inserted
501                         return true;
502                 }
503         }
504 }
505
506 void Table::startTransaction() {
507         // Create a new transaction, invalidates any old pending transactions.
508         if (pendingTransactionBuilder != NULL)
509                 delete pendingTransactionBuilder;
510         pendingTransactionBuilder = new PendingTransaction(localMachineId);
511 }
512
513 void Table::put(IoTString *key, IoTString *value) {
514         // Make sure it is a valid key
515         if (!arbitratorTable->contains(key)) {
516                 throw new Error("Key not Found.");
517         }
518
519         // Make sure new key value pair matches the current arbitrator
520         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
521                 // TODO: Maybe not throw en error
522                 throw new Error("Not all Key Values Match Arbitrator.");
523         }
524
525         // Add the key value to this transaction
526         KeyValue *kv = new KeyValue(key->acquireRef(), value->acquireRef());
527         pendingTransactionBuilder->addKV(kv);
528 }
529
530 TransactionStatus *Table::commitTransaction() {
531         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
532                 // transaction with no updates will have no effect on the system
533                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
534         }
535
536         // Set the local transaction sequence number and increment
537         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
538         localTransactionSequenceNumber++;
539
540         // Create the transaction status
541         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
542
543         // Create the new transaction
544         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
545         newTransaction->setTransactionStatus(transactionStatus);
546
547         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
548                 // Add it to the queue and invalidate the builder for safety
549                 pendingTransactionQueue->add(newTransaction);
550         } else {
551                 arbitrateOnLocalTransaction(newTransaction);
552                 delete newTransaction;
553                 updateLiveStateFromLocal();
554         }
555         if (pendingTransactionBuilder != NULL)
556                 delete pendingTransactionBuilder;
557         
558         pendingTransactionBuilder = new PendingTransaction(localMachineId);
559
560         try {
561                 sendToServer(NULL);
562         } catch (ServerException *e) {
563
564                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
565                 uint size = pendingTransactionQueue->size();
566                 uint oldindex = 0;
567                 for (uint iter = 0; iter < size; iter++) {
568                         Transaction *transaction = pendingTransactionQueue->get(iter);
569                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
570
571                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
572                                 // Already contacted this client so ignore all attempts to contact this client
573                                 // to preserve ordering for arbitrator
574                                 continue;
575                         }
576
577                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
578
579                         if (sendReturn.getFirst()) {
580                                 // Failed to contact over local
581                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
582                         } else {
583                                 // Successful contact or should not contact
584
585                                 if (sendReturn.getSecond()) {
586                                         // did arbitrate
587                                         delete transaction;
588                                         oldindex--;
589                                 }
590                         }
591                 }
592                 pendingTransactionQueue->setSize(oldindex);
593         }
594
595         updateLiveStateFromLocal();
596
597         return transactionStatus;
598 }
599
600 /**
601  * Recalculate the new resize threshold
602  */
603 void Table::setResizeThreshold() {
604         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
605         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
606 }
607
608 int64_t Table::getLocalSequenceNumber() {
609         return localSequenceNumber;
610 }
611
612 void Table::processTransactionList(bool handlePartial) {
613         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
614         while (trit->hasNext()) {
615                 Transaction *transaction = trit->next();
616                 transaction->resetServerFailure();
617                 // Update which transactions parts still need to be sent
618                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
619                 // Add the transaction status to the outstanding list
620                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
621                 
622                 // Update the transaction status
623                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
624                 
625                 // Check if all the transaction parts were successfully
626                 // sent and if so then remove it from pending
627                 if (transaction->didSendAllParts()) {
628                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
629                         pendingTransactionQueue->remove(transaction);
630                         delete transaction;
631                 } else if (handlePartial) {
632                         transaction->resetServerFailure();
633                         // Set the transaction sequence number back to nothing
634                         if (!transaction->didSendAPartToServer()) {
635                                 transaction->setSequenceNumber(-1);
636                         }
637                 }
638         }
639         delete trit;
640 }
641
642 NewKey * Table::handlePartialSend(NewKey * newKey) {
643         //Didn't receive acknowledgement for last send
644         //See if the server has received a newer slot
645         
646         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
647         if (newSlots->length() == 0) {
648                 //Retry sending old slot
649                 bool wasInserted = false;
650                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
651                 
652                 if (sendSlotsReturn) {
653                         lastSlotAttemptedToSend = NULL;
654                         if (newKey != NULL) {
655                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
656                                         delete newKey;
657                                         newKey = NULL;
658                                 }
659                         }
660                         processTransactionList(false);
661                 } else {
662                         if (checkSend(newSlots, lastSlotAttemptedToSend)) {
663                                 if (newKey != NULL) {
664                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
665                                                 delete newKey;
666                                                 newKey = NULL;
667                                         }
668                                 }
669                                 processTransactionList(true);
670                         }
671                 }
672                 
673                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
674                 while (trit->hasNext()) {
675                         Transaction *transaction = trit->next();
676                         transaction->resetServerFailure();
677                         // Set the transaction sequence number back to nothing
678                         if (!transaction->didSendAPartToServer()) {
679                                 transaction->setSequenceNumber(-1);
680                         }
681                 }
682                 delete trit;
683                 
684                 if (newSlots->length() != 0) {
685                         // insert into the local block chain
686                         validateAndUpdate(newSlots, true);
687                 }
688         } else {
689                 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
690                         if (newKey != NULL) {
691                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
692                                         delete newKey;
693                                         newKey = NULL;
694                                 }
695                         }
696
697                         processTransactionList(true);
698                 } else {
699                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
700                         while (trit->hasNext()) {
701                                 Transaction *transaction = trit->next();
702                                 transaction->resetServerFailure();
703                                 // Set the transaction sequence number back to nothing
704                                 if (!transaction->didSendAPartToServer()) {
705                                         transaction->setSequenceNumber(-1);
706                                 }
707                         }
708                         delete trit;
709                 }
710                 
711                 // insert into the local block chain
712                 validateAndUpdate(newSlots, true);
713         }
714         delete newSlots;
715         return newKey;
716 }
717
718 void Table::clearSentParts() {
719         // Clear the sent data since we are trying again
720         pendingSendArbitrationEntriesToDelete->clear();
721         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
722         while (trit->hasNext()) {
723                 Transaction *transaction = trit->next();
724                 delete trit->currVal();
725         }
726         delete trit;
727         transactionPartsSent->clear();
728 }
729
730 bool Table::sendToServer(NewKey *newKey) {
731         if (hadPartialSendToServer) {
732                 newKey = handlePartialSend(newKey);
733         }
734
735         try {
736                 // While we have stuff that needs inserting into the block chain
737                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
738                         if (hadPartialSendToServer) {
739                                 throw new Error("Should Be error free");
740                         }
741                         
742                         // If there is a new key with same name then end
743                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
744                                 delete newKey;
745                                 return false;
746                         }
747
748                         // Create the slot
749                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
750                         localSequenceNumber++;
751
752                         // Try to fill the slot with data
753                         int newSize = 0;
754                         bool insertedNewKey = false;
755                         bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
756
757                         if (needsResize) {
758                                 // Reset which transaction to send
759                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
760                                 while (trit->hasNext()) {
761                                         Transaction *transaction = trit->next();
762                                         transaction->resetNextPartToSend();
763
764                                         // Set the transaction sequence number back to nothing
765                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
766                                                 transaction->setSequenceNumber(-1);
767                                         }
768                                 }
769                                 delete trit;
770
771                                 // Clear the sent data since we are trying again
772                                 clearSentParts();
773                                         
774                                 // We needed a resize so try again
775                                 fillSlot(slot, true, newKey, newSize, insertedNewKey);
776                         }
777                         if (lastSlotAttemptedToSend != NULL)
778                                 delete lastSlotAttemptedToSend;
779                         
780                         lastSlotAttemptedToSend = slot;
781                         lastIsNewKey = (newKey != NULL);
782                         lastInsertedNewKey = insertedNewKey;
783                         lastNewSize = newSize;
784                         if (( newKey != lastNewKey) && (lastNewKey != NULL))
785                                 delete lastNewKey;
786                         lastNewKey = newKey;
787                         if (lastTransactionPartsSent != NULL)
788                                 delete lastTransactionPartsSent;
789                         lastTransactionPartsSent = transactionPartsSent->clone();
790
791                         Array<Slot *> * newSlots = NULL;
792                         bool wasInserted = false;
793                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
794
795                         if (sendSlotsReturn) {
796                                 lastSlotAttemptedToSend = NULL;
797                                 // Did insert into the block chain
798                                 if (insertedNewKey) {
799                                         // This slot was what was inserted not a previous slot
800                                         // New Key was successfully inserted into the block chain so dont want to insert it again
801                                         newKey = NULL;
802                                 }
803
804                                 // Remove the aborts and commit parts that were sent from the pending to send queue
805                                 uint size = pendingSendArbitrationRounds->size();
806                                 uint oldcount = 0;
807                                 for (uint i = 0; i < size; i++) {
808                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
809                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
810
811                                         if (!round->isDoneSending()) {
812                                                 //Add part back in
813                                                 pendingSendArbitrationRounds->set(oldcount++,
814                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
815                                         } else
816                                                 delete pendingSendArbitrationRounds->get(i);
817                                 }
818                                 pendingSendArbitrationRounds->setSize(oldcount);
819                                 processTransactionList(false);
820                         } else {
821                                 // Reset which transaction to send
822                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
823                                 while (trit->hasNext()) {
824                                         Transaction *transaction = trit->next();
825                                         transaction->resetNextPartToSend();
826
827                                         // Set the transaction sequence number back to nothing
828                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
829                                                 transaction->setSequenceNumber(-1);
830                                         }
831                                 }
832                                 delete trit;
833                         }
834
835                         // Clear the sent data in preparation for next send
836                         clearSentParts();
837
838                         if (newSlots->length() != 0) {
839                                 // insert into the local block chain
840                                 validateAndUpdate(newSlots, true);
841                         }
842                         delete newSlots;
843                 }
844         } catch (ServerException *e) {
845                 if (e->getType() != ServerException_TypeInputTimeout) {
846                         // Nothing was able to be sent to the server so just clear these data structures
847                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
848                         while (trit->hasNext()) {
849                                 Transaction *transaction = trit->next();
850                                 transaction->resetNextPartToSend();
851
852                                 // Set the transaction sequence number back to nothing
853                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
854                                         transaction->setSequenceNumber(-1);
855                                 }
856                         }
857                         delete trit;
858                 } else {
859                         // There was a partial send to the server
860                         hadPartialSendToServer = true;
861
862                         // Nothing was able to be sent to the server so just clear these data structures
863                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
864                         while (trit->hasNext()) {
865                                 Transaction *transaction = trit->next();
866                                 transaction->resetNextPartToSend();
867                                 transaction->setServerFailure();
868                         }
869                         delete trit;
870                 }
871
872                 clearSentParts();
873
874                 throw e;
875         }
876
877         return newKey == NULL;
878 }
879
880 bool Table::updateFromLocal(int64_t machineId) {
881         if (!localCommunicationTable->contains(machineId))
882                 return false;
883
884         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
885
886         // Get the size of the send data
887         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
888
889         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
890         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
891                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
892         }
893
894         Array<char> *sendData = new Array<char>(sendDataSize);
895         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
896
897         // Encode the data
898         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
899         bbEncode->putInt(0);
900
901         // Send by local
902         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
903         localSequenceNumber++;
904
905         if (returnData == NULL) {
906                 // Could not contact server
907                 return false;
908         }
909
910         // Decode the data
911         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
912         int numberOfEntries = bbDecode->getInt();
913
914         for (int i = 0; i < numberOfEntries; i++) {
915                 char type = bbDecode->get();
916                 if (type == TypeAbort) {
917                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
918                         processEntry(abort);
919                 } else if (type == TypeCommitPart) {
920                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
921                         processEntry(commitPart);
922                 }
923         }
924
925         updateLiveStateFromLocal();
926
927         return true;
928 }
929
930 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
931
932         // Get the devices local communications
933         if (!localCommunicationTable->contains(transaction->getArbitrator()))
934                 return Pair<bool, bool>(true, false);
935
936         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
937
938         // Get the size of the send data
939         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
940         {
941                 Vector<TransactionPart *> *tParts = transaction->getParts();
942                 uint tPartsSize = tParts->size();
943                 for (uint i = 0; i < tPartsSize; i++) {
944                         TransactionPart *part = tParts->get(i);
945                         sendDataSize += part->getSize();
946                 }
947         }
948
949         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
950         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
951                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
952         }
953
954         // Make the send data size
955         Array<char> *sendData = new Array<char>(sendDataSize);
956         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
957
958         // Encode the data
959         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
960         bbEncode->putInt(transaction->getParts()->size());
961         {
962                 Vector<TransactionPart *> *tParts = transaction->getParts();
963                 uint tPartsSize = tParts->size();
964                 for (uint i = 0; i < tPartsSize; i++) {
965                         TransactionPart *part = tParts->get(i);
966                         part->encode(bbEncode);
967                 }
968         }
969
970         // Send by local
971         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
972         localSequenceNumber++;
973
974         if (returnData == NULL) {
975                 // Could not contact server
976                 return Pair<bool, bool>(true, false);
977         }
978
979         // Decode the data
980         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
981         bool didCommit = bbDecode->get() == 1;
982         bool couldArbitrate = bbDecode->get() == 1;
983         int numberOfEntries = bbDecode->getInt();
984         bool foundAbort = false;
985
986         for (int i = 0; i < numberOfEntries; i++) {
987                 char type = bbDecode->get();
988                 if (type == TypeAbort) {
989                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
990
991                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
992                                 foundAbort = true;
993                         }
994
995                         processEntry(abort);
996                 } else if (type == TypeCommitPart) {
997                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
998                         processEntry(commitPart);
999                 }
1000         }
1001
1002         updateLiveStateFromLocal();
1003
1004         if (couldArbitrate) {
1005                 TransactionStatus *status =  transaction->getTransactionStatus();
1006                 if (didCommit) {
1007                         status->setStatus(TransactionStatus_StatusCommitted);
1008                 } else {
1009                         status->setStatus(TransactionStatus_StatusAborted);
1010                 }
1011         } else {
1012                 TransactionStatus *status =  transaction->getTransactionStatus();
1013                 if (foundAbort) {
1014                         status->setStatus(TransactionStatus_StatusAborted);
1015                 } else {
1016                         status->setStatus(TransactionStatus_StatusCommitted);
1017                 }
1018         }
1019
1020         return Pair<bool, bool>(false, true);
1021 }
1022
1023 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1024         // Decode the data
1025         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1026         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1027         int numberOfParts = bbDecode->getInt();
1028
1029         // If we did commit a transaction or not
1030         bool didCommit = false;
1031         bool couldArbitrate = false;
1032
1033         if (numberOfParts != 0) {
1034
1035                 // decode the transaction
1036                 Transaction *transaction = new Transaction();
1037                 for (int i = 0; i < numberOfParts; i++) {
1038                         bbDecode->get();
1039                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1040                         transaction->addPartDecode(newPart);
1041                 }
1042
1043                 // Arbitrate on transaction and pull relevant return data
1044                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1045                 couldArbitrate = localArbitrateReturn.getFirst();
1046                 didCommit = localArbitrateReturn.getSecond();
1047
1048                 updateLiveStateFromLocal();
1049
1050                 // Transaction was sent to the server so keep track of it to prevent double commit
1051                 if (transaction->getSequenceNumber() != -1) {
1052                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1053                 }
1054         }
1055
1056         // The data to send back
1057         int returnDataSize = 0;
1058         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1059
1060         // Get the aborts to send back
1061         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1062         {
1063                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1064                 while (abortit->hasNext())
1065                         abortLocalSequenceNumbers->add(abortit->next());
1066                 delete abortit;
1067         }
1068
1069         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1070
1071         uint asize = abortLocalSequenceNumbers->size();
1072         for (uint i = 0; i < asize; i++) {
1073                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1074                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1075                         continue;
1076                 }
1077
1078                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1079                 unseenArbitrations->add(abort);
1080                 returnDataSize += abort->getSize();
1081         }
1082
1083         // Get the commits to send back
1084         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1085         if (commitForClientTable != NULL) {
1086                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1087                 {
1088                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1089                         while (commitit->hasNext())
1090                                 commitLocalSequenceNumbers->add(commitit->next());
1091                         delete commitit;
1092                 }
1093                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1094
1095                 uint clsSize = commitLocalSequenceNumbers->size();
1096                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1097                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1098                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1099
1100                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1101                                 continue;
1102                         }
1103
1104                         {
1105                                 Vector<CommitPart *> *parts = commit->getParts();
1106                                 uint nParts = parts->size();
1107                                 for (uint i = 0; i < nParts; i++) {
1108                                         CommitPart *commitPart = parts->get(i);
1109                                         unseenArbitrations->add(commitPart);
1110                                         returnDataSize += commitPart->getSize();
1111                                 }
1112                         }
1113                 }
1114         }
1115
1116         // Number of arbitration entries to decode
1117         returnDataSize += 2 * sizeof(int32_t);
1118
1119         // bool of did commit or not
1120         if (numberOfParts != 0) {
1121                 returnDataSize += sizeof(char);
1122         }
1123
1124         // Data to send Back
1125         Array<char> *returnData = new Array<char>(returnDataSize);
1126         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1127
1128         if (numberOfParts != 0) {
1129                 if (didCommit) {
1130                         bbEncode->put((char)1);
1131                 } else {
1132                         bbEncode->put((char)0);
1133                 }
1134                 if (couldArbitrate) {
1135                         bbEncode->put((char)1);
1136                 } else {
1137                         bbEncode->put((char)0);
1138                 }
1139         }
1140
1141         bbEncode->putInt(unseenArbitrations->size());
1142         uint size = unseenArbitrations->size();
1143         for (uint i = 0; i < size; i++) {
1144                 Entry *entry = unseenArbitrations->get(i);
1145                 entry->encode(bbEncode);
1146         }
1147
1148         localSequenceNumber++;
1149         return returnData;
1150 }
1151
1152 /** Checks whether a given slot was sent using new slots in
1153                 array. Returns true if sent and false otherwise.  */
1154
1155 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1156         uint size = array->length();
1157         for (uint i = 0; i < size; i++) {
1158                 Slot *s = array->get(i);
1159                 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1160                         return true;
1161                 }
1162         }
1163         
1164         //Also need to see if other machines acknowledged our message
1165         for (uint i = 0; i < size; i++) {
1166                 Slot *s = array->get(i);
1167                 
1168                 // Process each entry in the slot
1169                 Vector<Entry *> *entries = s->getEntries();
1170                 uint eSize = entries->size();
1171                 for (uint ei = 0; ei < eSize; ei++) {
1172                         Entry *entry = entries->get(ei);
1173                         
1174                         if (entry->getType() == TypeLastMessage) {
1175                                 LastMessage *lastMessage = (LastMessage *)entry;
1176                                 
1177                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1178                                         return true;
1179                                 }
1180                         }
1181                 }
1182         }
1183         //Not found
1184         return false;
1185 }
1186
1187 /** Method tries to send slot to server.  Returns status in tuple.
1188                 isInserted returns whether last un-acked send (if any) was
1189                 successful.  Returns whether send was confirmed.x
1190  */
1191
1192 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1193         attemptedToSendToServer = true;
1194
1195         *array = cloud->putSlot(slot, newSize);
1196         if (*array == NULL) {
1197                 *array = new Array<Slot *>(1);
1198                 (*array)->set(0, slot);
1199                 rejectedSlotVector->clear();
1200                 *isInserted = false;
1201                 return true;
1202         } else {
1203                 if ((*array)->length() == 0) {
1204                         throw new Error("Server Error: Did not send any slots");
1205                 }
1206
1207                 if (hadPartialSendToServer) {
1208                         *isInserted = checkSend(*array, slot);
1209
1210                         if (!(*isInserted)) {
1211                                 rejectedSlotVector->add(slot->getSequenceNumber());
1212                         }
1213                         
1214                         return false;
1215                 } else {
1216                         rejectedSlotVector->add(slot->getSequenceNumber());
1217                         *isInserted = false;
1218                         return false;
1219                 }
1220         }
1221 }
1222
1223 /**
1224  * Returns true if a resize was needed but not done.
1225  */
1226 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1227         newSize = 0;//special value to indicate no resize
1228         if (liveSlotCount > bufferResizeThreshold) {
1229                 resize = true;//Resize is forced
1230         }
1231
1232         if (resize) {
1233                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1234                 TableStatus *status = new TableStatus(slot, newSize);
1235                 slot->addShallowEntry(status);
1236         }
1237
1238         // Fill with rejected slots first before doing anything else
1239         doRejectedMessages(slot);
1240
1241         // Do mandatory rescue of entries
1242         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1243
1244         // Extract working variables
1245         bool needsResize = mandatoryRescueReturn.getFirst();
1246         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1247         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1248
1249         if (needsResize && !resize) {
1250                 // We need to resize but we are not resizing so return true to force on retry
1251                 return true;
1252         }
1253
1254         insertedKey = false;
1255         if (newKeyEntry != NULL) {
1256                 newKeyEntry->setSlot(slot);
1257                 if (slot->hasSpace(newKeyEntry)) {
1258                         slot->addEntry(newKeyEntry);
1259                         insertedKey = true;
1260                 }
1261         }
1262
1263         // Clear the transactions, aborts and commits that were sent previously
1264         clearSentParts();
1265         uint size = pendingSendArbitrationRounds->size();
1266         for (uint i = 0; i < size; i++) {
1267                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1268                 bool isFull = false;
1269                 round->generateParts();
1270                 Vector<Entry *> *parts = round->getParts();
1271
1272                 // Insert pending arbitration data
1273                 uint vsize = parts->size();
1274                 for (uint vi = 0; vi < vsize; vi++) {
1275                         Entry *arbitrationData = parts->get(vi);
1276
1277                         // If it is an abort then we need to set some information
1278                         if (arbitrationData->getType() == TypeAbort) {
1279                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1280                         }
1281
1282                         if (!slot->hasSpace(arbitrationData)) {
1283                                 // No space so cant do anything else with these data entries
1284                                 isFull = true;
1285                                 break;
1286                         }
1287
1288                         // Add to this current slot and add it to entries to delete
1289                         slot->addEntry(arbitrationData);
1290                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1291                 }
1292
1293                 if (isFull) {
1294                         break;
1295                 }
1296         }
1297
1298         if (pendingTransactionQueue->size() > 0) {
1299                 Transaction *transaction = pendingTransactionQueue->get(0);
1300                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1301                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1302                         transaction->setSequenceNumber(slot->getSequenceNumber());
1303                 }
1304
1305                 while (true) {
1306                         TransactionPart *part = transaction->getNextPartToSend();
1307                         if (part == NULL) {
1308                                 // Ran out of parts to send for this transaction so move on
1309                                 break;
1310                         }
1311
1312                         if (slot->hasSpace(part)) {
1313                                 slot->addEntry(part);
1314                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1315                                 if (partsSent == NULL) {
1316                                         partsSent = new Vector<int32_t>();
1317                                         transactionPartsSent->put(transaction, partsSent);
1318                                 }
1319                                 partsSent->add(part->getPartNumber());
1320                                 transactionPartsSent->put(transaction, partsSent);
1321                         } else {
1322                                 break;
1323                         }
1324                 }
1325         }
1326
1327         // Fill the remainder of the slot with rescue data
1328         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1329
1330         return false;
1331 }
1332
1333 void Table::doRejectedMessages(Slot *s) {
1334         if (!rejectedSlotVector->isEmpty()) {
1335                 /* TODO: We should avoid generating a rejected message entry if
1336                  * there is already a sufficient entry in the queue (e->g->,
1337                  * equalsto value of true and same sequence number)->  */
1338
1339                 int64_t old_seqn = rejectedSlotVector->get(0);
1340                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1341                         int64_t new_seqn = rejectedSlotVector->lastElement();
1342                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1343                         s->addShallowEntry(rm);
1344                 } else {
1345                         int64_t prev_seqn = -1;
1346                         uint i = 0;
1347                         /* Go through list of missing messages */
1348                         for (; i < rejectedSlotVector->size(); i++) {
1349                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1350                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1351                                 if (s_msg != NULL)
1352                                         break;
1353                                 prev_seqn = curr_seqn;
1354                         }
1355                         /* Generate rejected message entry for missing messages */
1356                         if (prev_seqn != -1) {
1357                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1358                                 s->addShallowEntry(rm);
1359                         }
1360                         /* Generate rejected message entries for present messages */
1361                         for (; i < rejectedSlotVector->size(); i++) {
1362                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1363                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1364                                 int64_t machineid = s_msg->getMachineID();
1365                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1366                                 s->addShallowEntry(rm);
1367                         }
1368                 }
1369         }
1370 }
1371
1372 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1373         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1374         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1375         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1376                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1377         }
1378
1379         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1380         bool seenLiveSlot = false;
1381         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1382         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1383
1384
1385         // Mandatory Rescue
1386         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1387                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1388                 // Push slot number forward
1389                 if (!seenLiveSlot) {
1390                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1391                 }
1392
1393                 if (!previousSlot->isLive()) {
1394                         continue;
1395                 }
1396
1397                 // We have seen a live slot
1398                 seenLiveSlot = true;
1399
1400                 // Get all the live entries for a slot
1401                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1402
1403                 // Iterate over all the live entries and try to rescue them
1404                 uint lESize = liveEntries->size();
1405                 for (uint i = 0; i < lESize; i++) {
1406                         Entry *liveEntry = liveEntries->get(i);
1407                         if (slot->hasSpace(liveEntry)) {
1408                                 // Enough space to rescue the entry
1409                                 slot->addEntry(liveEntry);
1410                         } else if (currentSequenceNumber == firstIfFull) {
1411                                 //if there's no space but the entry is about to fall off the queue
1412                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1413                         }
1414                 }
1415         }
1416
1417         // Did not resize
1418         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1419 }
1420
1421 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1422         /* now go through live entries from least to greatest sequence number until
1423          * either all live slots added, or the slot doesn't have enough room
1424          * for SKIP_THRESHOLD consecutive entries*/
1425         int skipcount = 0;
1426         int64_t newestseqnum = buffer->getNewestSeqNum();
1427         for (; seqn <= newestseqnum; seqn++) {
1428                 Slot *prevslot = buffer->getSlot(seqn);
1429                 //Push slot number forward
1430                 if (!seenliveslot)
1431                         oldestLiveSlotSequenceNumver = seqn;
1432
1433                 if (!prevslot->isLive())
1434                         continue;
1435                 seenliveslot = true;
1436                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1437                 uint lESize = liveentries->size();
1438                 for (uint i = 0; i < lESize; i++) {
1439                         Entry *liveentry = liveentries->get(i);
1440                         if (s->hasSpace(liveentry))
1441                                 s->addEntry(liveentry);
1442                         else {
1443                                 skipcount++;
1444                                 if (skipcount > Table_SKIP_THRESHOLD) {
1445                                         delete liveentries;
1446                                         goto donesearch;
1447                                 }
1448                         }
1449                 }
1450                 delete liveentries;
1451         }
1452 donesearch:
1453         ;
1454 }
1455
1456 /**
1457  * Checks for malicious activity and updates the local copy of the block chain->
1458  */
1459 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1460         // The cloud communication layer has checked slot HMACs already
1461         // before decoding
1462         if (newSlots->length() == 0) {
1463                 return;
1464         }
1465
1466         // Make sure all slots are newer than the last largest slot this
1467         // client has seen
1468         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1469         if (firstSeqNum <= sequenceNumber) {
1470                 throw new Error("Server Error: Sent older slots!");
1471         }
1472
1473         // Create an object that can access both new slots and slots in our
1474         // local chain without committing slots to our local chain
1475         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1476
1477         // Check that the HMAC chain is not broken
1478         checkHMACChain(indexer, newSlots);
1479
1480         // Set to keep track of messages from clients
1481         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1482         {
1483                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1484                 while (lmit->hasNext())
1485                         machineSet->add(lmit->next());
1486                 delete lmit;
1487         }
1488
1489         // Process each slots data
1490         {
1491                 uint numSlots = newSlots->length();
1492                 for (uint i = 0; i < numSlots; i++) {
1493                         Slot *slot = newSlots->get(i);
1494                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1495                         updateExpectedSize();
1496                 }
1497         }
1498         delete indexer;
1499         
1500         // If there is a gap, check to see if the server sent us
1501         // everything->
1502         if (firstSeqNum != (sequenceNumber + 1)) {
1503
1504                 // Check the size of the slots that were sent down by the server->
1505                 // Can only check the size if there was a gap
1506                 checkNumSlots(newSlots->length());
1507
1508                 // Since there was a gap every machine must have pushed a slot or
1509                 // must have a last message message-> If not then the server is
1510                 // hiding slots
1511                 if (!machineSet->isEmpty()) {
1512                         delete machineSet;
1513                         throw new Error("Missing record for machines: ");
1514                 }
1515         }
1516         delete machineSet;
1517         // Update the size of our local block chain->
1518         commitNewMaxSize();
1519
1520         // Commit new to slots to the local block chain->
1521         {
1522                 uint numSlots = newSlots->length();
1523                 for (uint i = 0; i < numSlots; i++) {
1524                         Slot *slot = newSlots->get(i);
1525
1526                         // Insert this slot into our local block chain copy->
1527                         buffer->putSlot(slot);
1528
1529                         // Keep track of how many slots are currently live (have live data
1530                         // in them)->
1531                         liveSlotCount++;
1532                 }
1533         }
1534         // Get the sequence number of the latest slot in the system
1535         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1536         updateLiveStateFromServer();
1537
1538         // No Need to remember after we pulled from the server
1539         offlineTransactionsCommittedAndAtServer->clear();
1540
1541         // This is invalidated now
1542         hadPartialSendToServer = false;
1543 }
1544
1545 void Table::updateLiveStateFromServer() {
1546         // Process the new transaction parts
1547         processNewTransactionParts();
1548
1549         // Do arbitration on new transactions that were received
1550         arbitrateFromServer();
1551
1552         // Update all the committed keys
1553         bool didCommitOrSpeculate = updateCommittedTable();
1554
1555         // Delete the transactions that are now dead
1556         updateLiveTransactionsAndStatus();
1557
1558         // Do speculations
1559         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1560         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1561 }
1562
1563 void Table::updateLiveStateFromLocal() {
1564         // Update all the committed keys
1565         bool didCommitOrSpeculate = updateCommittedTable();
1566
1567         // Delete the transactions that are now dead
1568         updateLiveTransactionsAndStatus();
1569
1570         // Do speculations
1571         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1572         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1573 }
1574
1575 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1576         int64_t prevslots = firstSequenceNumber;
1577
1578         if (didFindTableStatus) {
1579         } else {
1580                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1581         }
1582
1583         didFindTableStatus = true;
1584         currMaxSize = numberOfSlots;
1585 }
1586
1587 void Table::updateExpectedSize() {
1588         expectedsize++;
1589
1590         if (expectedsize > currMaxSize) {
1591                 expectedsize = currMaxSize;
1592         }
1593 }
1594
1595
1596 /**
1597  * Check the size of the block chain to make sure there are enough
1598  * slots sent back by the server-> This is only called when we have a
1599  * gap between the slots that we have locally and the slots sent by
1600  * the server therefore in the slots sent by the server there will be
1601  * at least 1 Table status message
1602  */
1603 void Table::checkNumSlots(int numberOfSlots) {
1604         if (numberOfSlots != expectedsize) {
1605                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1606         }
1607 }
1608
1609 /**
1610  * Update the size of of the local buffer if it is needed->
1611  */
1612 void Table::commitNewMaxSize() {
1613         didFindTableStatus = false;
1614
1615         // Resize the local slot buffer
1616         if (numberOfSlots != currMaxSize) {
1617                 buffer->resize((int32_t)currMaxSize);
1618         }
1619
1620         // Change the number of local slots to the new size
1621         numberOfSlots = (int32_t)currMaxSize;
1622
1623         // Recalculate the resize threshold since the size of the local
1624         // buffer has changed
1625         setResizeThreshold();
1626 }
1627
1628 /**
1629  * Process the new transaction parts from this latest round of slots
1630  * received from the server
1631  */
1632 void Table::processNewTransactionParts() {
1633
1634         if (newTransactionParts->size() == 0) {
1635                 // Nothing new to process
1636                 return;
1637         }
1638
1639         // Iterate through all the machine Ids that we received new parts
1640         // for
1641         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1642         while (tpit->hasNext()) {
1643                 int64_t machineId = tpit->next();
1644                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
1645
1646                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1647                 // Iterate through all the parts for that machine Id
1648                 while (ptit->hasNext()) {
1649                         Pair<int64_t, int32_t> *partId = ptit->next();
1650                         TransactionPart *part = parts->get(partId);
1651
1652                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1653                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1654                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1655                                         // Set dead the transaction part
1656                                         part->setDead();
1657                                         part->releaseRef();
1658                                         continue;
1659                                 }
1660                         }
1661
1662                         // Get the transaction object for that sequence number
1663                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1664
1665                         if (transaction == NULL) {
1666                                 // This is a new transaction that we dont have so make a new one
1667                                 transaction = new Transaction();
1668                                 
1669                                 // Add that part to the transaction
1670                                 transaction->addPartDecode(part);
1671
1672                                 // Insert this new transaction into the live tables
1673                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1674                                 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1675                         }
1676                         part->releaseRef();
1677                 }
1678                 delete ptit;
1679         }
1680         delete tpit;
1681         // Clear all the new transaction parts in preparation for the next
1682         // time the server sends slots
1683         {
1684                 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1685                 while (partsit->hasNext()) {
1686                         int64_t machineId = partsit->next();
1687                         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1688                         delete parts;
1689                 }
1690                 delete partsit;
1691                 newTransactionParts->clear();
1692         }
1693 }
1694
1695 void Table::arbitrateFromServer() {
1696         if (liveTransactionBySequenceNumberTable->size() == 0) {
1697                 // Nothing to arbitrate on so move on
1698                 return;
1699         }
1700
1701         // Get the transaction sequence numbers and sort from oldest to newest
1702         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1703         {
1704                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1705                 while (trit->hasNext())
1706                         transactionSequenceNumbers->add(trit->next());
1707                 delete trit;
1708         }
1709         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1710
1711         // Collection of key value pairs that are
1712         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1713
1714         // The last transaction arbitrated on
1715         int64_t lastTransactionCommitted = -1;
1716         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1717         uint tsnSize = transactionSequenceNumbers->size();
1718         for (uint i = 0; i < tsnSize; i++) {
1719                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1720                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1721
1722                 // Check if this machine arbitrates for this transaction if not
1723                 // then we cant arbitrate this transaction
1724                 if (transaction->getArbitrator() != localMachineId) {
1725                         continue;
1726                 }
1727
1728                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1729                         continue;
1730                 }
1731
1732                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1733                         // We have seen this already locally so dont commit again
1734                         continue;
1735                 }
1736
1737                 if (!transaction->isComplete()) {
1738                         // Will arbitrate in incorrect order if we continue so just break
1739                         // Most likely this
1740                         break;
1741                 }
1742
1743                 // update the largest transaction seen by arbitrator from server
1744                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1745                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1746                 } else {
1747                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1748                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1749                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1750                         }
1751                 }
1752
1753                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1754                         // Guard evaluated as true
1755                         // Update the local changes so we can make the commit
1756                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1757                         while (kvit->hasNext()) {
1758                                 KeyValue *kv = kvit->next();
1759                                 speculativeTableTmp->put(kv->getKey(), kv);
1760                         }
1761                         delete kvit;
1762
1763                         // Update what the last transaction committed was for use in batch commit
1764                         lastTransactionCommitted = transactionSequenceNumber;
1765                 } else {
1766                         // Guard evaluated was false so create abort
1767                         // Create the abort
1768                         Abort *newAbort = new Abort(NULL,
1769                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1770                                                                                                                                         transaction->getSequenceNumber(),
1771                                                                                                                                         transaction->getMachineId(),
1772                                                                                                                                         transaction->getArbitrator(),
1773                                                                                                                                         localArbitrationSequenceNumber);
1774                         localArbitrationSequenceNumber++;
1775                         generatedAborts->add(newAbort);
1776
1777                         // Insert the abort so we can process
1778                         processEntry(newAbort);
1779                 }
1780
1781                 lastSeqNumArbOn = transactionSequenceNumber;
1782         }
1783
1784         delete transactionSequenceNumbers;
1785
1786         Commit *newCommit = NULL;
1787
1788         // If there is something to commit
1789         if (speculativeTableTmp->size() != 0) {
1790                 // Create the commit and increment the commit sequence number
1791                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1792                 localArbitrationSequenceNumber++;
1793
1794                 // Add all the new keys to the commit
1795                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1796                 while (spit->hasNext()) {
1797                         IoTString *string = spit->next();
1798                         KeyValue *kv = speculativeTableTmp->get(string);
1799                         newCommit->addKV(kv);
1800                 }
1801                 delete spit;
1802                 
1803                 // create the commit parts
1804                 newCommit->createCommitParts();
1805
1806                 // Append all the commit parts to the end of the pending queue
1807                 // waiting for sending to the server
1808                 // Insert the commit so we can process it
1809                 Vector<CommitPart *> *parts = newCommit->getParts();
1810                 uint partsSize = parts->size();
1811                 for (uint i = 0; i < partsSize; i++) {
1812                         CommitPart *commitPart = parts->get(i);
1813                         processEntry(commitPart);
1814                 }
1815         }
1816         delete speculativeTableTmp;
1817
1818         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1819                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1820                 pendingSendArbitrationRounds->add(arbitrationRound);
1821
1822                 if (compactArbitrationData()) {
1823                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1824                         if (newArbitrationRound->getCommit() != NULL) {
1825                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1826                                 uint partsSize = parts->size();
1827                                 for (uint i = 0; i < partsSize; i++) {
1828                                         CommitPart *commitPart = parts->get(i);
1829                                         processEntry(commitPart);
1830                                 }
1831                         }
1832                 }
1833         } else {
1834                 delete generatedAborts;
1835         }
1836 }
1837
1838 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1839
1840         // Check if this machine arbitrates for this transaction if not then
1841         // we cant arbitrate this transaction
1842         if (transaction->getArbitrator() != localMachineId) {
1843                 return Pair<bool, bool>(false, false);
1844         }
1845
1846         if (!transaction->isComplete()) {
1847                 // Will arbitrate in incorrect order if we continue so just break
1848                 // Most likely this
1849                 return Pair<bool, bool>(false, false);
1850         }
1851
1852         if (transaction->getMachineId() != localMachineId) {
1853                 // dont do this check for local transactions
1854                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1855                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1856                                 // We've have already seen this from the server
1857                                 return Pair<bool, bool>(false, false);
1858                         }
1859                 }
1860         }
1861
1862         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1863                 // Guard evaluated as true Create the commit and increment the
1864                 // commit sequence number
1865                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1866                 localArbitrationSequenceNumber++;
1867
1868                 // Update the local changes so we can make the commit
1869                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1870                 while (kvit->hasNext()) {
1871                         KeyValue *kv = kvit->next();
1872                         newCommit->addKV(kv);
1873                 }
1874                 delete kvit;
1875
1876                 // create the commit parts
1877                 newCommit->createCommitParts();
1878
1879                 // Append all the commit parts to the end of the pending queue
1880                 // waiting for sending to the server
1881                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1882                 pendingSendArbitrationRounds->add(arbitrationRound);
1883
1884                 if (compactArbitrationData()) {
1885                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1886                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1887                         uint partsSize = parts->size();
1888                         for (uint i = 0; i < partsSize; i++) {
1889                                 CommitPart *commitPart = parts->get(i);
1890                                 processEntry(commitPart);
1891                         }
1892                 } else {
1893                         // Insert the commit so we can process it
1894                         Vector<CommitPart *> *parts = newCommit->getParts();
1895                         uint partsSize = parts->size();
1896                         for (uint i = 0; i < partsSize; i++) {
1897                                 CommitPart *commitPart = parts->get(i);
1898                                 processEntry(commitPart);
1899                         }
1900                 }
1901
1902                 if (transaction->getMachineId() == localMachineId) {
1903                         TransactionStatus *status = transaction->getTransactionStatus();
1904                         if (status != NULL) {
1905                                 status->setStatus(TransactionStatus_StatusCommitted);
1906                         }
1907                 }
1908
1909                 updateLiveStateFromLocal();
1910                 return Pair<bool, bool>(true, true);
1911         } else {
1912                 if (transaction->getMachineId() == localMachineId) {
1913                         // For locally created messages update the status
1914                         // Guard evaluated was false so create abort
1915                         TransactionStatus *status = transaction->getTransactionStatus();
1916                         if (status != NULL) {
1917                                 status->setStatus(TransactionStatus_StatusAborted);
1918                         }
1919                 } else {
1920                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1921
1922                         // Create the abort
1923                         Abort *newAbort = new Abort(NULL,
1924                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1925                                                                                                                                         -1,
1926                                                                                                                                         transaction->getMachineId(),
1927                                                                                                                                         transaction->getArbitrator(),
1928                                                                                                                                         localArbitrationSequenceNumber);
1929                         localArbitrationSequenceNumber++;
1930                         addAbortSet->add(newAbort);
1931
1932                         // Append all the commit parts to the end of the pending queue
1933                         // waiting for sending to the server
1934                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1935                         pendingSendArbitrationRounds->add(arbitrationRound);
1936
1937                         if (compactArbitrationData()) {
1938                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1939
1940                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1941                                 uint partsSize = parts->size();
1942                                 for (uint i = 0; i < partsSize; i++) {
1943                                         CommitPart *commitPart = parts->get(i);
1944                                         processEntry(commitPart);
1945                                 }
1946                         }
1947                 }
1948
1949                 updateLiveStateFromLocal();
1950                 return Pair<bool, bool>(true, false);
1951         }
1952 }
1953
1954 /**
1955  * Compacts the arbitration data by merging commits and aggregating
1956  * aborts so that a single large push of commits can be done instead
1957  * of many small updates
1958  */
1959 bool Table::compactArbitrationData() {
1960         if (pendingSendArbitrationRounds->size() < 2) {
1961                 // Nothing to compact so do nothing
1962                 return false;
1963         }
1964
1965         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1966         if (lastRound->getDidSendPart()) {
1967                 return false;
1968         }
1969
1970         bool hadCommit = (lastRound->getCommit() == NULL);
1971         bool gotNewCommit = false;
1972
1973         uint numberToDelete = 1;
1974         
1975         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1976                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1977
1978                 if (round->isFull() || round->getDidSendPart()) {
1979                         // Stop since there is a part that cannot be compacted and we
1980                         // need to compact in order
1981                         break;
1982                 }
1983
1984                 if (round->getCommit() == NULL) {
1985                         // Try compacting aborts only
1986                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1987                         if (newSize > ArbitrationRound_MAX_PARTS) {
1988                                 // Cant compact since it would be too large
1989                                 break;
1990                         }
1991                         lastRound->addAborts(round->getAborts());
1992                 } else {
1993                         // Create a new larger commit
1994                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1995                         localArbitrationSequenceNumber++;
1996
1997                         // Create the commit parts so that we can count them
1998                         newCommit->createCommitParts();
1999
2000                         // Calculate the new size of the parts
2001                         int newSize = newCommit->getNumberOfParts();
2002                         newSize += lastRound->getAbortsCount();
2003                         newSize += round->getAbortsCount();
2004
2005                         if (newSize > ArbitrationRound_MAX_PARTS) {
2006                                 // Can't compact since it would be too large
2007                                 if (lastRound->getCommit() != newCommit &&
2008                                                 round->getCommit() != newCommit)
2009                                         delete newCommit;
2010                                 break;
2011                         }
2012                         // Set the new compacted part
2013                         if (lastRound->getCommit() == newCommit)
2014                                 lastRound->setCommit(NULL);
2015                         if (round->getCommit() == newCommit)
2016                                 round->setCommit(NULL);
2017                         
2018                         if (lastRound->getCommit() != NULL) {
2019                                 Commit * oldcommit = lastRound->getCommit();
2020                                 lastRound->setCommit(NULL);
2021                                 delete oldcommit;
2022                         }
2023                         lastRound->setCommit(newCommit);
2024                         lastRound->addAborts(round->getAborts());
2025                         gotNewCommit = true;
2026                 }
2027
2028                 numberToDelete++;
2029         }
2030
2031         if (numberToDelete != 1) {
2032                 // If there is a compaction
2033                 // Delete the previous pieces that are now in the new compacted piece
2034                 for (uint i = 2; i <= numberToDelete; i++) {
2035                         delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2036                 }
2037                 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2038
2039                 pendingSendArbitrationRounds->add(lastRound);
2040
2041                 // Should reinsert into the commit processor
2042                 if (hadCommit && gotNewCommit) {
2043                         return true;
2044                 }
2045         }
2046
2047         return false;
2048 }
2049
2050 /**
2051  * Update all the commits and the committed tables, sets dead the dead
2052  * transactions
2053  */
2054 bool Table::updateCommittedTable() {
2055         if (newCommitParts->size() == 0) {
2056                 // Nothing new to process
2057                 return false;
2058         }
2059
2060         // Iterate through all the machine Ids that we received new parts for
2061         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2062         while (partsit->hasNext()) {
2063                 int64_t machineId = partsit->next();
2064                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2065
2066                 // Iterate through all the parts for that machine Id
2067                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2068                 while (pairit->hasNext()) {
2069                         Pair<int64_t, int32_t> *partId = pairit->next();
2070                         CommitPart *part = pairit->currVal();
2071
2072                         // Get the transaction object for that sequence number
2073                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2074
2075                         if (commitForClientTable == NULL) {
2076                                 // This is the first commit from this device
2077                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2078                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2079                         }
2080
2081                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2082
2083                         if (commit == NULL) {
2084                                 // This is a new commit that we dont have so make a new one
2085                                 commit = new Commit();
2086
2087                                 // Insert this new commit into the live tables
2088                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2089                         }
2090
2091                         // Add that part to the commit
2092                         commit->addPartDecode(part);
2093                         part->releaseRef();
2094                 }
2095                 delete pairit;
2096                 delete parts;
2097         }
2098         delete partsit;
2099
2100         // Clear all the new commits parts in preparation for the next time
2101         // the server sends slots
2102         newCommitParts->clear();
2103
2104         // If we process a new commit keep track of it for future use
2105         bool didProcessANewCommit = false;
2106
2107         // Process the commits one by one
2108         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2109         while (liveit->hasNext()) {
2110                 int64_t arbitratorId = liveit->next();
2111                 // Get all the commits for a specific arbitrator
2112                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2113
2114                 // Sort the commits in order
2115                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2116                 {
2117                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2118                         while (clientit->hasNext())
2119                                 commitSequenceNumbers->add(clientit->next());
2120                         delete clientit;
2121                 }
2122
2123                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2124
2125                 // Get the last commit seen from this arbitrator
2126                 int64_t lastCommitSeenSequenceNumber = -1;
2127                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2128                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2129                 }
2130
2131                 // Go through each new commit one by one
2132                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2133                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2134                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2135                         // Special processing if a commit is not complete
2136                         if (!commit->isComplete()) {
2137                                 if (i == (commitSequenceNumbers->size() - 1)) {
2138                                         // If there is an incomplete commit and this commit is the
2139                                         // latest one seen then this commit cannot be processed and
2140                                         // there are no other commits
2141                                         break;
2142                                 } else {
2143                                         // This is a commit that was already dead but parts of it
2144                                         // are still in the block chain (not flushed out yet)->
2145                                         // Delete it and move on
2146                                         commit->setDead();
2147                                         commitForClientTable->remove(commit->getSequenceNumber());
2148                                         delete commit;
2149                                         continue;
2150                                 }
2151                         }
2152
2153                         // Update the last transaction that was updated if we can
2154                         if (commit->getTransactionSequenceNumber() != -1) {
2155                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2156                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2157                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2158                                 }
2159                         }
2160
2161                         // Update the last arbitration data that we have seen so far
2162                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2163                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2164                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2165                                         // Is larger
2166                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2167                                 }
2168                         } else {
2169                                 // Never seen any data from this arbitrator so record the first one
2170                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2171                         }
2172
2173                         // We have already seen this commit before so need to do the
2174                         // full processing on this commit
2175                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2176                                 // Update the last transaction that was updated if we can
2177                                 if (commit->getTransactionSequenceNumber() != -1) {
2178                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2179                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2180                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2181                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2182                                         }
2183                                 }
2184                                 continue;
2185                         }
2186
2187                         // If we got here then this is a brand new commit and needs full
2188                         // processing
2189                         // Get what commits should be edited, these are the commits that
2190                         // have live values for their keys
2191                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2192                         {
2193                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2194                                 while (kvit->hasNext()) {
2195                                         KeyValue *kv = kvit->next();
2196                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2197                                         if (commit != NULL)
2198                                                 commitsToEdit->add(commit);
2199                                 }
2200                                 delete kvit;
2201                         }
2202
2203                         // Update each previous commit that needs to be updated
2204                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2205                         while (commitit->hasNext()) {
2206                                 Commit *previousCommit = commitit->next();
2207
2208                                 // Only bother with live commits (TODO: Maybe remove this check)
2209                                 if (previousCommit->isLive()) {
2210
2211                                         // Update which keys in the old commits are still live
2212                                         {
2213                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2214                                                 while (kvit->hasNext()) {
2215                                                         KeyValue *kv = kvit->next();
2216                                                         previousCommit->invalidateKey(kv->getKey());
2217                                                 }
2218                                                 delete kvit;
2219                                         }
2220
2221                                         // if the commit is now dead then remove it
2222                                         if (!previousCommit->isLive()) {
2223                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2224                                                 delete previousCommit;
2225                                         }
2226                                 }
2227                         }
2228                         delete commitit;
2229                         delete commitsToEdit;
2230
2231                         // Update the last seen sequence number from this arbitrator
2232                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2233                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2234                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2235                                 }
2236                         } else {
2237                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2238                         }
2239
2240                         // We processed a new commit that we havent seen before
2241                         didProcessANewCommit = true;
2242
2243                         // Update the committed table of keys and which commit is using which key
2244                         {
2245                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2246                                 while (kvit->hasNext()) {
2247                                         KeyValue *kv = kvit->next();
2248                                         committedKeyValueTable->put(kv->getKey(), kv);
2249                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2250                                 }
2251                                 delete kvit;
2252                         }
2253                 }
2254                 delete commitSequenceNumbers;
2255         }
2256         delete liveit;
2257
2258         return didProcessANewCommit;
2259 }
2260
2261 /**
2262  * Create the speculative table from transactions that are still live
2263  * and have come from the cloud
2264  */
2265 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2266         if (liveTransactionBySequenceNumberTable->size() == 0) {
2267                 // There is nothing to speculate on
2268                 return false;
2269         }
2270
2271         // Create a list of the transaction sequence numbers and sort them
2272         // from oldest to newest
2273         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2274         {
2275                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2276                 while (trit->hasNext())
2277                         transactionSequenceNumbersSorted->add(trit->next());
2278                 delete trit;
2279         }
2280
2281         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2282
2283         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2284
2285
2286         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2287                 // If there is a gap in the transaction sequence numbers then
2288                 // there was a commit or an abort of a transaction OR there was a
2289                 // new commit (Could be from offline commit) so a redo the
2290                 // speculation from scratch
2291
2292                 // Start from scratch
2293                 speculatedKeyValueTable->clear();
2294                 lastTransactionSequenceNumberSpeculatedOn = -1;
2295                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2296         }
2297
2298         // Remember the front of the transaction list
2299         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2300
2301         // Find where to start arbitration from
2302         uint startIndex = 0;
2303
2304         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2305                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2306                         break;
2307         startIndex++;
2308
2309         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2310                 // Make sure we are not out of bounds
2311                 delete transactionSequenceNumbersSorted;
2312                 return false;           // did not speculate
2313         }
2314
2315         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2316         bool didSkip = true;
2317
2318         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2319                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2320                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2321
2322                 if (!transaction->isComplete()) {
2323                         // If there is an incomplete transaction then there is nothing
2324                         // we can do add this transactions arbitrator to the list of
2325                         // arbitrators we should ignore
2326                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2327                         didSkip = true;
2328                         continue;
2329                 }
2330
2331                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2332                         continue;
2333                 }
2334
2335                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2336
2337                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2338                         // Guard evaluated to true so update the speculative table
2339                         {
2340                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2341                                 while (kvit->hasNext()) {
2342                                         KeyValue *kv = kvit->next();
2343                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2344                                 }
2345                                 delete kvit;
2346                         }
2347                 }
2348         }
2349
2350         delete transactionSequenceNumbersSorted;
2351         
2352         if (didSkip) {
2353                 // Since there was a skip we need to redo the speculation next time around
2354                 lastTransactionSequenceNumberSpeculatedOn = -1;
2355                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2356         }
2357
2358         // We did some speculation
2359         return true;
2360 }
2361
2362 /**
2363  * Create the pending transaction speculative table from transactions
2364  * that are still in the pending transaction buffer
2365  */
2366 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2367         if (pendingTransactionQueue->size() == 0) {
2368                 // There is nothing to speculate on
2369                 return;
2370         }
2371
2372         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2373                 // need to reset on the pending speculation
2374                 lastPendingTransactionSpeculatedOn = NULL;
2375                 firstPendingTransaction = pendingTransactionQueue->get(0);
2376                 pendingTransactionSpeculatedKeyValueTable->clear();
2377         }
2378
2379         // Find where to start arbitration from
2380         uint startIndex = 0;
2381
2382         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2383                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2384                         break;
2385
2386         if (startIndex >= pendingTransactionQueue->size()) {
2387                 // Make sure we are not out of bounds
2388                 return;
2389         }
2390
2391         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2392                 Transaction *transaction = pendingTransactionQueue->get(i);
2393
2394                 lastPendingTransactionSpeculatedOn = transaction;
2395
2396                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2397                         // Guard evaluated to true so update the speculative table
2398                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2399                         while (kvit->hasNext()) {
2400                                 KeyValue *kv = kvit->next();
2401                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2402                         }
2403                         delete kvit;
2404                 }
2405         }
2406 }
2407
2408 /**
2409  * Set dead and remove from the live transaction tables the
2410  * transactions that are dead
2411  */
2412 void Table::updateLiveTransactionsAndStatus() {
2413         // Go through each of the transactions
2414         {
2415                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2416                 while (iter->hasNext()) {
2417                         int64_t key = iter->next();
2418                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2419
2420                         // Check if the transaction is dead
2421                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2422                                         && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2423                                 // Set dead the transaction
2424                                 transaction->setDead();
2425
2426                                 // Remove the transaction from the live table
2427                                 iter->remove();
2428                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2429                                 delete transaction;
2430                         }
2431                 }
2432                 delete iter;
2433         }
2434
2435         // Go through each of the transactions
2436         {
2437                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2438                 while (iter->hasNext()) {
2439                         int64_t key = iter->next();
2440                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2441
2442                         // Check if the transaction is dead
2443                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2444                                         && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2445                                 // Set committed
2446                                 status->setStatus(TransactionStatus_StatusCommitted);
2447
2448                                 // Remove
2449                                 iter->remove();
2450                         }
2451                 }
2452                 delete iter;
2453         }
2454 }
2455
2456 /**
2457  * Process this slot, entry by entry->  Also update the latest message sent by slot
2458  */
2459 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2460
2461         // Update the last message seen
2462         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2463
2464         // Process each entry in the slot
2465         Vector<Entry *> *entries = slot->getEntries();
2466         uint eSize = entries->size();
2467         for (uint ei = 0; ei < eSize; ei++) {
2468                 Entry *entry = entries->get(ei);
2469                 switch (entry->getType()) {
2470                 case TypeCommitPart:
2471                         processEntry((CommitPart *)entry);
2472                         break;
2473                 case TypeAbort:
2474                         processEntry((Abort *)entry);
2475                         break;
2476                 case TypeTransactionPart:
2477                         processEntry((TransactionPart *)entry);
2478                         break;
2479                 case TypeNewKey:
2480                         processEntry((NewKey *)entry);
2481                         break;
2482                 case TypeLastMessage:
2483                         processEntry((LastMessage *)entry, machineSet);
2484                         break;
2485                 case TypeRejectedMessage:
2486                         processEntry((RejectedMessage *)entry, indexer);
2487                         break;
2488                 case TypeTableStatus:
2489                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2490                         break;
2491                 default:
2492                         throw new Error("Unrecognized type: ");
2493                 }
2494         }
2495 }
2496
2497 /**
2498  * Update the last message that was sent for a machine Id
2499  */
2500 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2501         // Update what the last message received by a machine was
2502         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2503 }
2504
2505 /**
2506  * Add the new key to the arbitrators table and update the set of live
2507  * new keys (in case of a rescued new key message)
2508  */
2509 void Table::processEntry(NewKey *entry) {
2510         // Update the arbitrator table with the new key information
2511         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2512
2513         // Update what the latest live new key is
2514         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2515         if (oldNewKey != NULL) {
2516                 // Delete the old new key messages
2517                 oldNewKey->setDead();
2518         }
2519 }
2520
2521 /**
2522  * Process new table status entries and set dead the old ones as new
2523  * ones come in-> keeps track of the largest and smallest table status
2524  * seen in this current round of updating the local copy of the block
2525  * chain
2526  */
2527 void Table::processEntry(TableStatus *entry, int64_t seq) {
2528         int newNumSlots = entry->getMaxSlots();
2529         updateCurrMaxSize(newNumSlots);
2530         initExpectedSize(seq, newNumSlots);
2531
2532         if (liveTableStatus != NULL) {
2533                 // We have a larger table status so the old table status is no
2534                 // int64_ter alive
2535                 liveTableStatus->setDead();
2536         }
2537
2538         // Make this new table status the latest alive table status
2539         liveTableStatus = entry;
2540 }
2541
2542 /**
2543  * Check old messages to see if there is a block chain violation->
2544  * Also
2545  */
2546 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2547         int64_t oldSeqNum = entry->getOldSeqNum();
2548         int64_t newSeqNum = entry->getNewSeqNum();
2549         bool isequal = entry->getEqual();
2550         int64_t machineId = entry->getMachineID();
2551         int64_t seq = entry->getSequenceNumber();
2552
2553         // Check if we have messages that were supposed to be rejected in
2554         // our local block chain
2555         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2556                 // Get the slot
2557                 Slot *slot = indexer->getSlot(seqNum);
2558
2559                 if (slot != NULL) {
2560                         // If we have this slot make sure that it was not supposed to be
2561                         // a rejected slot
2562                         int64_t slotMachineId = slot->getMachineID();
2563                         if (isequal != (slotMachineId == machineId)) {
2564                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2565                         }
2566                 }
2567         }
2568
2569         // Create a list of clients to watch until they see this rejected
2570         // message entry->
2571         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2572         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2573         while (iter->hasNext()) {
2574                 // Machine ID for the last message entry
2575                 int64_t lastMessageEntryMachineId = iter->next();
2576
2577                 // We've seen it, don't need to continue to watch->  Our next
2578                 // message will implicitly acknowledge it->
2579                 if (lastMessageEntryMachineId == localMachineId) {
2580                         continue;
2581                 }
2582
2583                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2584                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2585
2586                 if (entrySequenceNumber < seq) {
2587                         // Add this rejected message to the set of messages that this
2588                         // machine ID did not see yet
2589                         addWatchVector(lastMessageEntryMachineId, entry);
2590                         // This client did not see this rejected message yet so add it
2591                         // to the watch set to monitor
2592                         deviceWatchSet->add(lastMessageEntryMachineId);
2593                 }
2594         }
2595         delete iter;
2596
2597         if (deviceWatchSet->isEmpty()) {
2598                 // This rejected message has been seen by all the clients so
2599                 entry->setDead();
2600                 delete deviceWatchSet;
2601         } else {
2602                 // We need to watch this rejected message
2603                 entry->setWatchSet(deviceWatchSet);
2604         }
2605 }
2606
2607 /**
2608  * Check if this abort is live, if not then save it so we can kill it
2609  * later-> update the last transaction number that was arbitrated on->
2610  */
2611 void Table::processEntry(Abort *entry) {
2612         if (entry->getTransactionSequenceNumber() != -1) {
2613                 // update the transaction status if it was sent to the server
2614                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2615                 if (status != NULL) {
2616                         status->setStatus(TransactionStatus_StatusAborted);
2617                 }
2618         }
2619
2620         // Abort has not been seen by the client it is for yet so we need to
2621         // keep track of it
2622
2623         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2624         if (previouslySeenAbort != NULL) {
2625                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2626         }
2627
2628         if (entry->getTransactionArbitrator() == localMachineId) {
2629                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2630         }
2631
2632         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2633                 // The machine already saw this so it is dead
2634                 entry->setDead();
2635                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2636                 liveAbortTable->remove(&abortid);
2637
2638                 if (entry->getTransactionArbitrator() == localMachineId) {
2639                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2640                 }
2641                 return;
2642         }
2643
2644         // Update the last arbitration data that we have seen so far
2645         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2646                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2647                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2648                         // Is larger
2649                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2650                 }
2651         } else {
2652                 // Never seen any data from this arbitrator so record the first one
2653                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2654         }
2655
2656         // Set dead a transaction if we can
2657         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2658
2659         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2660         if (transactionToSetDead != NULL) {
2661                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2662         }
2663
2664         // Update the last transaction sequence number that the arbitrator
2665         // arbitrated on
2666         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2667                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2668                 // Is a valid one
2669                 if (entry->getTransactionSequenceNumber() != -1) {
2670                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2671                 }
2672         }
2673 }
2674
2675 /**
2676  * Set dead the transaction part if that transaction is dead and keep
2677  * track of all new parts
2678  */
2679 void Table::processEntry(TransactionPart *entry) {
2680         // Check if we have already seen this transaction and set it dead OR
2681         // if it is not alive
2682         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2683                 // This transaction is dead, it was already committed or aborted
2684                 entry->setDead();
2685                 return;
2686         }
2687
2688         // This part is still alive
2689         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2690
2691         if (transactionPart == NULL) {
2692                 // Dont have a table for this machine Id yet so make one
2693                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2694                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2695         }
2696
2697         // Update the part and set dead ones we have already seen (got a
2698         // rescued version)
2699         entry->acquireRef();
2700         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2701         if (previouslySeenPart != NULL) {
2702                 previouslySeenPart->releaseRef();
2703                 previouslySeenPart->setDead();
2704         }
2705 }
2706
2707 /**
2708  * Process new commit entries and save them for future use->  Delete duplicates
2709  */
2710 void Table::processEntry(CommitPart *entry) {
2711         // Update the last transaction that was updated if we can
2712         if (entry->getTransactionSequenceNumber() != -1) {
2713                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2714                                 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2715                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2716                 }
2717         }
2718
2719         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2720         if (commitPart == NULL) {
2721                 // Don't have a table for this machine Id yet so make one
2722                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2723                 newCommitParts->put(entry->getMachineId(), commitPart);
2724         }
2725         // Update the part and set dead ones we have already seen (got a
2726         // rescued version)
2727         entry->acquireRef();
2728         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2729         if (previouslySeenPart != NULL) {
2730                 previouslySeenPart->setDead();
2731                 previouslySeenPart->releaseRef();
2732         }
2733 }
2734
2735 /**
2736  * Update the last message seen table-> Update and set dead the
2737  * appropriate RejectedMessages as clients see them-> Updates the live
2738  * aborts, removes those that are dead and sets them dead-> Check that
2739  * the last message seen is correct and that there is no mismatch of
2740  * our own last message or that other clients have not had a rollback
2741  * on the last message->
2742  */
2743 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2744         // We have seen this machine ID
2745         machineSet->remove(machineId);
2746
2747         // Get the set of rejected messages that this machine Id is has not seen yet
2748         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2749         // If there is a rejected message that this machine Id has not seen yet
2750         if (watchset != NULL) {
2751                 // Go through each rejected message that this machine Id has not
2752                 // seen yet
2753
2754                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2755                 while (rmit->hasNext()) {
2756                         RejectedMessage *rm = rmit->next();
2757                         // If this machine Id has seen this rejected message->->->
2758                         if (rm->getSequenceNumber() <= seqNum) {
2759                                 // Remove it from our watchlist
2760                                 rmit->remove();
2761                                 // Decrement machines that need to see this notification
2762                                 rm->removeWatcher(machineId);
2763                         }
2764                 }
2765                 delete rmit;
2766         }
2767
2768         // Set dead the abort
2769         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2770
2771         while (abortit->hasNext()) {
2772                 Pair<int64_t, int64_t> *key = abortit->next();
2773                 Abort *abort = liveAbortTable->get(key);
2774                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2775                         abort->setDead();
2776                         abortit->remove();
2777                         if (abort->getTransactionArbitrator() == localMachineId) {
2778                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2779                         }
2780                 }
2781         }
2782         delete abortit;
2783         if (machineId == localMachineId) {
2784                 // Our own messages are immediately dead->
2785                 char livenessType = liveness->getType();
2786                 if (livenessType == TypeLastMessage) {
2787                         ((LastMessage *)liveness)->setDead();
2788                 } else if (livenessType == TypeSlot) {
2789                         ((Slot *)liveness)->setDead();
2790                 } else {
2791                         throw new Error("Unrecognized type");
2792                 }
2793         }
2794         // Get the old last message for this device
2795         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2796         if (lastMessageEntry == NULL) {
2797                 // If no last message then there is nothing else to process
2798                 return;
2799         }
2800
2801         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2802         Liveness *lastEntry = lastMessageEntry->getSecond();
2803         delete lastMessageEntry;
2804
2805         // If it is not our machine Id since we already set ours to dead
2806         if (machineId != localMachineId) {
2807                 char lastEntryType = lastEntry->getType();
2808
2809                 if (lastEntryType == TypeLastMessage) {
2810                         ((LastMessage *)lastEntry)->setDead();
2811                 } else if (lastEntryType == TypeSlot) {
2812                         ((Slot *)lastEntry)->setDead();
2813                 } else {
2814                         throw new Error("Unrecognized type");
2815                 }
2816         }
2817         // Make sure the server is not playing any games
2818         if (machineId == localMachineId) {
2819                 if (hadPartialSendToServer) {
2820                         // We were not making any updates and we had a machine mismatch
2821                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2822                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2823                         }
2824                 } else {
2825                         // We were not making any updates and we had a machine mismatch
2826                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2827                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2828                         }
2829                 }
2830         } else {
2831                 if (lastMessageSeqNum > seqNum) {
2832                         throw new Error("Server Error: Rollback on remote machine sequence number");
2833                 }
2834         }
2835 }
2836
2837 /**
2838  * Add a rejected message entry to the watch set to keep track of
2839  * which clients have seen that rejected message entry and which have
2840  * not.
2841  */
2842 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2843         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2844         if (entries == NULL) {
2845                 // There is no set for this machine ID yet so create one
2846                 entries = new Hashset<RejectedMessage *>();
2847                 rejectedMessageWatchVectorTable->put(machineId, entries);
2848         }
2849         entries->add(entry);
2850 }
2851
2852 /**
2853  * Check if the HMAC chain is not violated
2854  */
2855 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2856         for (uint i = 0; i < newSlots->length(); i++) {
2857                 Slot *currSlot = newSlots->get(i);
2858                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2859                 if (prevSlot != NULL &&
2860                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2861                         throw new Error("Server Error: Invalid HMAC Chain");
2862         }
2863 }