edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22 #include <stdlib.h>
23
24 int compareInt64(const void *a, const void *b) {
25         const int64_t *pa = (const int64_t *) a;
26         const int64_t *pb = (const int64_t *) b;
27         if (*pa < *pb)
28                 return -1;
29         else if (*pa > *pb)
30                 return 1;
31         else
32                 return 0;
33 }
34
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
36         buffer(NULL),
37         cloud(new CloudComm(this, baseurl, password, listeningPort)),
38         random(NULL),
39         liveTableStatus(NULL),
40         pendingTransactionBuilder(NULL),
41         lastPendingTransactionSpeculatedOn(NULL),
42         firstPendingTransaction(NULL),
43         numberOfSlots(0),
44         bufferResizeThreshold(0),
45         liveSlotCount(0),
46         oldestLiveSlotSequenceNumver(1),
47         localMachineId(_localMachineId),
48         sequenceNumber(0),
49         localSequenceNumber(0),
50         localTransactionSequenceNumber(0),
51         lastTransactionSequenceNumberSpeculatedOn(0),
52         oldestTransactionSequenceNumberSpeculatedOn(0),
53         localArbitrationSequenceNumber(0),
54         hadPartialSendToServer(false),
55         attemptedToSendToServer(false),
56         expectedsize(0),
57         didFindTableStatus(false),
58         currMaxSize(0),
59         lastSlotAttemptedToSend(NULL),
60         lastIsNewKey(false),
61         lastNewSize(0),
62         lastTransactionPartsSent(NULL),
63         lastPendingSendArbitrationEntriesToDelete(NULL),
64         lastNewKey(NULL),
65         committedKeyValueTable(NULL),
66         speculatedKeyValueTable(NULL),
67         pendingTransactionSpeculatedKeyValueTable(NULL),
68         liveNewKeyTable(NULL),
69         lastMessageTable(NULL),
70         rejectedMessageWatchVectorTable(NULL),
71         arbitratorTable(NULL),
72         liveAbortTable(NULL),
73         newTransactionParts(NULL),
74         newCommitParts(NULL),
75         lastArbitratedTransactionNumberByArbitratorTable(NULL),
76         liveTransactionBySequenceNumberTable(NULL),
77         liveTransactionByTransactionIdTable(NULL),
78         liveCommitsTable(NULL),
79         liveCommitsByKeyTable(NULL),
80         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81         rejectedSlotVector(NULL),
82         pendingTransactionQueue(NULL),
83         pendingSendArbitrationRounds(NULL),
84         pendingSendArbitrationEntriesToDelete(NULL),
85         transactionPartsSent(NULL),
86         outstandingTransactionStatus(NULL),
87         liveAbortsGeneratedByLocal(NULL),
88         offlineTransactionsCommittedAndAtServer(NULL),
89         localCommunicationTable(NULL),
90         lastTransactionSeenFromMachineFromServer(NULL),
91         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92         lastInsertedNewKey(false),
93         lastSeqNumArbOn(0)
94 {
95         init();
96 }
97
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
99         buffer(NULL),
100         cloud(_cloud),
101         random(NULL),
102         liveTableStatus(NULL),
103         pendingTransactionBuilder(NULL),
104         lastPendingTransactionSpeculatedOn(NULL),
105         firstPendingTransaction(NULL),
106         numberOfSlots(0),
107         bufferResizeThreshold(0),
108         liveSlotCount(0),
109         oldestLiveSlotSequenceNumver(1),
110         localMachineId(_localMachineId),
111         sequenceNumber(0),
112         localSequenceNumber(0),
113         localTransactionSequenceNumber(0),
114         lastTransactionSequenceNumberSpeculatedOn(0),
115         oldestTransactionSequenceNumberSpeculatedOn(0),
116         localArbitrationSequenceNumber(0),
117         hadPartialSendToServer(false),
118         attemptedToSendToServer(false),
119         expectedsize(0),
120         didFindTableStatus(false),
121         currMaxSize(0),
122         lastSlotAttemptedToSend(NULL),
123         lastIsNewKey(false),
124         lastNewSize(0),
125         lastTransactionPartsSent(NULL),
126         lastPendingSendArbitrationEntriesToDelete(NULL),
127         lastNewKey(NULL),
128         committedKeyValueTable(NULL),
129         speculatedKeyValueTable(NULL),
130         pendingTransactionSpeculatedKeyValueTable(NULL),
131         liveNewKeyTable(NULL),
132         lastMessageTable(NULL),
133         rejectedMessageWatchVectorTable(NULL),
134         arbitratorTable(NULL),
135         liveAbortTable(NULL),
136         newTransactionParts(NULL),
137         newCommitParts(NULL),
138         lastArbitratedTransactionNumberByArbitratorTable(NULL),
139         liveTransactionBySequenceNumberTable(NULL),
140         liveTransactionByTransactionIdTable(NULL),
141         liveCommitsTable(NULL),
142         liveCommitsByKeyTable(NULL),
143         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144         rejectedSlotVector(NULL),
145         pendingTransactionQueue(NULL),
146         pendingSendArbitrationRounds(NULL),
147         pendingSendArbitrationEntriesToDelete(NULL),
148         transactionPartsSent(NULL),
149         outstandingTransactionStatus(NULL),
150         liveAbortsGeneratedByLocal(NULL),
151         offlineTransactionsCommittedAndAtServer(NULL),
152         localCommunicationTable(NULL),
153         lastTransactionSeenFromMachineFromServer(NULL),
154         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155         lastInsertedNewKey(false),
156         lastSeqNumArbOn(0)
157 {
158         init();
159 }
160
161 Table::~Table() {
162         delete cloud;
163         delete random;
164         delete buffer;
165         // init data structs
166         delete committedKeyValueTable;
167         delete speculatedKeyValueTable;
168         delete pendingTransactionSpeculatedKeyValueTable;
169         delete liveNewKeyTable;
170         delete lastMessageTable;
171         delete rejectedMessageWatchVectorTable;
172         delete arbitratorTable;
173         delete liveAbortTable;
174         delete newTransactionParts;
175         delete newCommitParts;
176         delete lastArbitratedTransactionNumberByArbitratorTable;
177         delete liveTransactionBySequenceNumberTable;
178         delete liveTransactionByTransactionIdTable;
179         delete liveCommitsTable;
180         delete liveCommitsByKeyTable;
181         delete lastCommitSeenSequenceNumberByArbitratorTable;
182         delete rejectedSlotVector;
183         delete pendingTransactionQueue;
184         delete pendingSendArbitrationEntriesToDelete;
185         delete transactionPartsSent;
186         delete outstandingTransactionStatus;
187         delete liveAbortsGeneratedByLocal;
188         delete offlineTransactionsCommittedAndAtServer;
189         delete localCommunicationTable;
190         delete lastTransactionSeenFromMachineFromServer;
191         delete pendingSendArbitrationRounds;
192         delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
193 }
194
195 /**
196  * Init all the stuff needed for for table usage
197  */
198 void Table::init() {
199         // Init helper objects
200         random = new SecureRandom();
201         buffer = new SlotBuffer();
202
203         // init data structs
204         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
205         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
206         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
207         liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
208         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
209         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
210         arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
211         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
212         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
213         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
214         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
215         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
216         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
217         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
218         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
219         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
220         rejectedSlotVector = new Vector<int64_t>();
221         pendingTransactionQueue = new Vector<Transaction *>();
222         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
223         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
224         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
225         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
226         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
227         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
228         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
229         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
230         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
231
232         // Other init stuff
233         numberOfSlots = buffer->capacity();
234         setResizeThreshold();
235 }
236
237 /**
238  * Initialize the table by inserting a table status as the first entry
239  * into the table status also initialize the crypto stuff.
240  */
241 void Table::initTable() {
242         cloud->initSecurity();
243
244         // Create the first insertion into the block chain which is the table status
245         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
246         localSequenceNumber++;
247         TableStatus *status = new TableStatus(s, numberOfSlots);
248         s->addEntry(status);
249         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
250
251         if (array == NULL) {
252                 array = new Array<Slot *>(1);
253                 array->set(0, s);
254                 // update local block chain
255                 validateAndUpdate(array, true);
256         } else if (array->length() == 1) {
257                 // in case we did push the slot BUT we failed to init it
258                 validateAndUpdate(array, true);
259         } else {
260                 throw new Error("Error on initialization");
261         }
262 }
263
264 /**
265  * Rebuild the table from scratch by pulling the latest block chain
266  * from the server.
267  */
268 void Table::rebuild() {
269         // Just pull the latest slots from the server
270         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
271         validateAndUpdate(newslots, true);
272         sendToServer(NULL);
273         updateLiveTransactionsAndStatus();
274 }
275
276 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
277         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
278 }
279
280 int64_t Table::getArbitrator(IoTString *key) {
281         return arbitratorTable->get(key);
282 }
283
284 void Table::close() {
285         cloud->closeCloud();
286 }
287
288 IoTString *Table::getCommitted(IoTString *key)  {
289         KeyValue *kv = committedKeyValueTable->get(key);
290
291         if (kv != NULL) {
292                 return kv->getValue();
293         } else {
294                 return NULL;
295         }
296 }
297
298 IoTString *Table::getSpeculative(IoTString *key) {
299         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
300
301         if (kv == NULL) {
302                 kv = speculatedKeyValueTable->get(key);
303         }
304
305         if (kv == NULL) {
306                 kv = committedKeyValueTable->get(key);
307         }
308
309         if (kv != NULL) {
310                 return kv->getValue();
311         } else {
312                 return NULL;
313         }
314 }
315
316 IoTString *Table::getCommittedAtomic(IoTString *key) {
317         KeyValue *kv = committedKeyValueTable->get(key);
318
319         if (!arbitratorTable->contains(key)) {
320                 throw new Error("Key not Found.");
321         }
322
323         // Make sure new key value pair matches the current arbitrator
324         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
325                 // TODO: Maybe not throw en error
326                 throw new Error("Not all Key Values Match Arbitrator.");
327         }
328
329         if (kv != NULL) {
330                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
331                 return kv->getValue();
332         } else {
333                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
334                 return NULL;
335         }
336 }
337
338 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
339         if (!arbitratorTable->contains(key)) {
340                 throw new Error("Key not Found.");
341         }
342
343         // Make sure new key value pair matches the current arbitrator
344         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
345                 // TODO: Maybe not throw en error
346                 throw new Error("Not all Key Values Match Arbitrator.");
347         }
348
349         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
350
351         if (kv == NULL) {
352                 kv = speculatedKeyValueTable->get(key);
353         }
354
355         if (kv == NULL) {
356                 kv = committedKeyValueTable->get(key);
357         }
358
359         if (kv != NULL) {
360                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
361                 return kv->getValue();
362         } else {
363                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
364                 return NULL;
365         }
366 }
367
368 bool Table::update()  {
369         try {
370                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
371                 validateAndUpdate(newSlots, false);
372                 sendToServer(NULL);
373                 updateLiveTransactionsAndStatus();
374                 return true;
375         } catch (Exception *e) {
376                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
377                 while (kit->hasNext()) {
378                         int64_t m = kit->next();
379                         updateFromLocal(m);
380                 }
381                 delete kit;
382         }
383
384         return false;
385 }
386
387 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
388         while (true) {
389                 if (arbitratorTable->contains(keyName)) {
390                         // There is already an arbitrator
391                         return false;
392                 }
393                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
394
395                 if (sendToServer(newKey)) {
396                         // If successfully inserted
397                         return true;
398                 }
399         }
400 }
401
402 void Table::startTransaction() {
403         // Create a new transaction, invalidates any old pending transactions.
404         pendingTransactionBuilder = new PendingTransaction(localMachineId);
405 }
406
407 void Table::addKV(IoTString *key, IoTString *value) {
408
409         // Make sure it is a valid key
410         if (!arbitratorTable->contains(key)) {
411                 throw new Error("Key not Found.");
412         }
413
414         // Make sure new key value pair matches the current arbitrator
415         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
416                 // TODO: Maybe not throw en error
417                 throw new Error("Not all Key Values Match Arbitrator.");
418         }
419
420         // Add the key value to this transaction
421         KeyValue *kv = new KeyValue(key, value);
422         pendingTransactionBuilder->addKV(kv);
423 }
424
425 TransactionStatus *Table::commitTransaction() {
426         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
427                 // transaction with no updates will have no effect on the system
428                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
429         }
430
431         // Set the local transaction sequence number and increment
432         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
433         localTransactionSequenceNumber++;
434
435         // Create the transaction status
436         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
437
438         // Create the new transaction
439         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
440         newTransaction->setTransactionStatus(transactionStatus);
441
442         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
443                 // Add it to the queue and invalidate the builder for safety
444                 pendingTransactionQueue->add(newTransaction);
445         } else {
446                 arbitrateOnLocalTransaction(newTransaction);
447                 updateLiveStateFromLocal();
448         }
449
450         pendingTransactionBuilder = new PendingTransaction(localMachineId);
451
452         try {
453                 sendToServer(NULL);
454         } catch (ServerException *e) {
455
456                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
457                 uint size = pendingTransactionQueue->size();
458                 uint oldindex = 0;
459                 for (uint iter = 0; iter < size; iter++) {
460                         Transaction *transaction = pendingTransactionQueue->get(iter);
461                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
462
463                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
464                                 // Already contacted this client so ignore all attempts to contact this client
465                                 // to preserve ordering for arbitrator
466                                 continue;
467                         }
468
469                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
470
471                         if (sendReturn.getFirst()) {
472                                 // Failed to contact over local
473                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
474                         } else {
475                                 // Successful contact or should not contact
476
477                                 if (sendReturn.getSecond()) {
478                                         // did arbitrate
479                                         oldindex--;
480                                 }
481                         }
482                 }
483                 pendingTransactionQueue->setSize(oldindex);
484         }
485
486         updateLiveStateFromLocal();
487
488         return transactionStatus;
489 }
490
491 /**
492  * Recalculate the new resize threshold
493  */
494 void Table::setResizeThreshold() {
495         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
496         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
497 }
498
499 int64_t Table::getLocalSequenceNumber() {
500         return localSequenceNumber;
501 }
502
503 NewKey * Table::handlePartialSend(NewKey * newKey) {
504         //Didn't receive acknowledgement for last send
505         //See if the server has received a newer slot
506         
507         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
508         if (newSlots->length() == 0) {
509                 //Retry sending old slot
510                 bool wasInserted = false;
511                 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
512                 
513                 if (sendSlotsReturn) {
514                         if (newKey != NULL) {
515                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
516                                         newKey = NULL;
517                                 }
518                         }
519                         
520                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
521                         while (trit->hasNext()) {
522                                 Transaction *transaction = trit->next();
523                                 transaction->resetServerFailure();
524                                 // Update which transactions parts still need to be sent
525                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
526                                 // Add the transaction status to the outstanding list
527                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
528                                 
529                                 // Update the transaction status
530                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
531                                 
532                                 // Check if all the transaction parts were successfully
533                                 // sent and if so then remove it from pending
534                                 if (transaction->didSendAllParts()) {
535                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
536                                         pendingTransactionQueue->remove(transaction);
537                                 }
538                         }
539                         delete trit;
540                 } else {
541                         bool isInserted = false;
542                         for (uint si = 0; si < newSlots->length(); si++) {
543                                 Slot *s = newSlots->get(si);
544                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
545                                         isInserted = true;
546                                         break;
547                                 }
548                         }
549                         
550                         for (uint si = 0; si < newSlots->length(); si++) {
551                                 Slot *s = newSlots->get(si);
552                                 if (isInserted) {
553                                         break;
554                                 }
555                                 
556                                 // Process each entry in the slot
557                                 Vector<Entry *> *ventries = s->getEntries();
558                                 uint vesize = ventries->size();
559                                 for (uint vei = 0; vei < vesize; vei++) {
560                                         Entry *entry = ventries->get(vei);
561                                         if (entry->getType() == TypeLastMessage) {
562                                                 LastMessage *lastMessage = (LastMessage *)entry;
563                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
564                                                         isInserted = true;
565                                                         break;
566                                                 }
567                                         }
568                                 }
569                         }
570                         
571                         if (isInserted) {
572                                 if (newKey != NULL) {
573                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
574                                                 newKey = NULL;
575                                         }
576                                 }
577                                 
578                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
579                                 while (trit->hasNext()) {
580                                         Transaction *transaction = trit->next();
581                                         transaction->resetServerFailure();
582                                         
583                                         // Update which transactions parts still need to be sent
584                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
585                                         
586                                         // Add the transaction status to the outstanding list
587                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
588                                         
589                                         // Update the transaction status
590                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
591                                         
592                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
593                                         if (transaction->didSendAllParts()) {
594                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
595                                                 pendingTransactionQueue->remove(transaction);
596                                         } else {
597                                                 transaction->resetServerFailure();
598                                                 // Set the transaction sequence number back to nothing
599                                                 if (!transaction->didSendAPartToServer()) {
600                                                         transaction->setSequenceNumber(-1);
601                                                 }
602                                         }
603                                 }
604                                 delete trit;
605                         }
606                 }
607                 
608                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
609                 while (trit->hasNext()) {
610                         Transaction *transaction = trit->next();
611                         transaction->resetServerFailure();
612                         // Set the transaction sequence number back to nothing
613                         if (!transaction->didSendAPartToServer()) {
614                                 transaction->setSequenceNumber(-1);
615                         }
616                 }
617                 delete trit;
618                 
619                 if (newSlots->length() != 0) {
620                         // insert into the local block chain
621                         validateAndUpdate(newSlots, true);
622                 }
623                 // continue;
624         } else {
625                 bool isInserted = false;
626                 for (uint si = 0; si < newSlots->length(); si++) {
627                         Slot *s = newSlots->get(si);
628                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
629                                 isInserted = true;
630                                 break;
631                         }
632                 }
633                 
634                 for (uint si = 0; si < newSlots->length(); si++) {
635                         Slot *s = newSlots->get(si);
636                         if (isInserted) {
637                                 break;
638                         }
639                         
640                         // Process each entry in the slot
641                         Vector<Entry *> *entries = s->getEntries();
642                         uint eSize = entries->size();
643                         for (uint ei = 0; ei < eSize; ei++) {
644                                 Entry *entry = entries->get(ei);
645                                 
646                                 if (entry->getType() == TypeLastMessage) {
647                                         LastMessage *lastMessage = (LastMessage *)entry;
648                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
649                                                 isInserted = true;
650                                                 break;
651                                         }
652                                 }
653                         }
654                 }
655                 
656                 if (isInserted) {
657                         if (newKey != NULL) {
658                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
659                                         newKey = NULL;
660                                 }
661                         }
662                         
663                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664                         while (trit->hasNext()) {
665                                 Transaction *transaction = trit->next();
666                                 transaction->resetServerFailure();
667                                 
668                                 // Update which transactions parts still need to be sent
669                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
670                                 
671                                 // Add the transaction status to the outstanding list
672                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
673                                 
674                                 // Update the transaction status
675                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
676                                 
677                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
678                                 if (transaction->didSendAllParts()) {
679                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
680                                         pendingTransactionQueue->remove(transaction);
681                                 } else {
682                                         transaction->resetServerFailure();
683                                         // Set the transaction sequence number back to nothing
684                                         if (!transaction->didSendAPartToServer()) {
685                                                 transaction->setSequenceNumber(-1);
686                                         }
687                                 }
688                         }
689                         delete trit;
690                 } else {
691                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
692                         while (trit->hasNext()) {
693                                 Transaction *transaction = trit->next();
694                                 transaction->resetServerFailure();
695                                 // Set the transaction sequence number back to nothing
696                                 if (!transaction->didSendAPartToServer()) {
697                                         transaction->setSequenceNumber(-1);
698                                 }
699                         }
700                         delete trit;
701                 }
702                 
703                 // insert into the local block chain
704                 validateAndUpdate(newSlots, true);
705         }
706         return newKey;
707 }
708
709 bool Table::sendToServer(NewKey *newKey) {
710         if (hadPartialSendToServer) {
711                 newKey = handlePartialSend(newKey);
712         }
713
714         try {
715                 // While we have stuff that needs inserting into the block chain
716                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
717                         if (hadPartialSendToServer) {
718                                 throw new Error("Should Be error free");
719                         }
720                         
721                         // If there is a new key with same name then end
722                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
723                                 return false;
724                         }
725
726                         // Create the slot
727                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
728                         localSequenceNumber++;
729
730                         // Try to fill the slot with data
731                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
732                         bool needsResize = fillSlotsReturn.getFirst();
733                         int newSize = fillSlotsReturn.getSecond();
734                         bool insertedNewKey = fillSlotsReturn.getThird();
735
736                         if (needsResize) {
737                                 // Reset which transaction to send
738                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
739                                 while (trit->hasNext()) {
740                                         Transaction *transaction = trit->next();
741                                         transaction->resetNextPartToSend();
742
743                                         // Set the transaction sequence number back to nothing
744                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
745                                                 transaction->setSequenceNumber(-1);
746                                         }
747                                 }
748                                 delete trit;
749
750                                 // Clear the sent data since we are trying again
751                                 pendingSendArbitrationEntriesToDelete->clear();
752                                 transactionPartsSent->clear();
753
754                                 // We needed a resize so try again
755                                 fillSlot(slot, true, newKey);
756                         }
757
758                         lastSlotAttemptedToSend = slot;
759                         lastIsNewKey = (newKey != NULL);
760                         lastInsertedNewKey = insertedNewKey;
761                         lastNewSize = newSize;
762                         lastNewKey = newKey;
763                         lastTransactionPartsSent = transactionPartsSent->clone();
764                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
765
766                         Array<Slot *> * newSlots = NULL;
767                         bool wasInserted = false;
768                         bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
769
770                         if (sendSlotsReturn) {
771                                 // Did insert into the block chain
772
773                                 if (insertedNewKey) {
774                                         // This slot was what was inserted not a previous slot
775
776                                         // New Key was successfully inserted into the block chain so dont want to insert it again
777                                         newKey = NULL;
778                                 }
779
780                                 // Remove the aborts and commit parts that were sent from the pending to send queue
781                                 uint size = pendingSendArbitrationRounds->size();
782                                 uint oldcount = 0;
783                                 for (uint i = 0; i < size; i++) {
784                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
785                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
786
787                                         if (!round->isDoneSending()) {
788                                                 // Sent all the parts
789                                                 pendingSendArbitrationRounds->set(oldcount++,
790                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
791                                         }
792                                 }
793                                 pendingSendArbitrationRounds->setSize(oldcount);
794
795                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
796                                 while (trit->hasNext()) {
797                                         Transaction *transaction = trit->next();
798                                         transaction->resetServerFailure();
799
800                                         // Update which transactions parts still need to be sent
801                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
802
803                                         // Add the transaction status to the outstanding list
804                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
805
806                                         // Update the transaction status
807                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
808
809                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
810                                         if (transaction->didSendAllParts()) {
811                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
812                                                 pendingTransactionQueue->remove(transaction);
813                                         }
814                                 }
815                                 delete trit;
816                         } else {
817                                 // Reset which transaction to send
818                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
819                                 while (trit->hasNext()) {
820                                         Transaction *transaction = trit->next();
821                                         transaction->resetNextPartToSend();
822
823                                         // Set the transaction sequence number back to nothing
824                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
825                                                 transaction->setSequenceNumber(-1);
826                                         }
827                                 }
828                                 delete trit;
829                         }
830
831                         // Clear the sent data in preparation for next send
832                         pendingSendArbitrationEntriesToDelete->clear();
833                         transactionPartsSent->clear();
834
835                         if (newSlots->length() != 0) {
836                                 // insert into the local block chain
837                                 validateAndUpdate(newSlots, true);
838                         }
839                 }
840
841         } catch (ServerException *e) {
842                 if (e->getType() != ServerException_TypeInputTimeout) {
843                         // Nothing was able to be sent to the server so just clear these data structures
844                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
845                         while (trit->hasNext()) {
846                                 Transaction *transaction = trit->next();
847                                 transaction->resetNextPartToSend();
848
849                                 // Set the transaction sequence number back to nothing
850                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
851                                         transaction->setSequenceNumber(-1);
852                                 }
853                         }
854                         delete trit;
855                 } else {
856                         // There was a partial send to the server
857                         hadPartialSendToServer = true;
858
859                         // Nothing was able to be sent to the server so just clear these data structures
860                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
861                         while (trit->hasNext()) {
862                                 Transaction *transaction = trit->next();
863                                 transaction->resetNextPartToSend();
864                                 transaction->setServerFailure();
865                         }
866                         delete trit;
867                 }
868
869                 pendingSendArbitrationEntriesToDelete->clear();
870                 transactionPartsSent->clear();
871
872                 throw e;
873         }
874
875         return newKey == NULL;
876 }
877
878 bool Table::updateFromLocal(int64_t machineId) {
879         if (!localCommunicationTable->contains(machineId))
880                 return false;
881
882         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
883
884         // Get the size of the send data
885         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
886
887         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
888         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
889                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
890         }
891
892         Array<char> *sendData = new Array<char>(sendDataSize);
893         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
894
895         // Encode the data
896         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
897         bbEncode->putInt(0);
898
899         // Send by local
900         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
901         localSequenceNumber++;
902
903         if (returnData == NULL) {
904                 // Could not contact server
905                 return false;
906         }
907
908         // Decode the data
909         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
910         int numberOfEntries = bbDecode->getInt();
911
912         for (int i = 0; i < numberOfEntries; i++) {
913                 char type = bbDecode->get();
914                 if (type == TypeAbort) {
915                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
916                         processEntry(abort);
917                 } else if (type == TypeCommitPart) {
918                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
919                         processEntry(commitPart);
920                 }
921         }
922
923         updateLiveStateFromLocal();
924
925         return true;
926 }
927
928 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
929
930         // Get the devices local communications
931         if (!localCommunicationTable->contains(transaction->getArbitrator()))
932                 return Pair<bool, bool>(true, false);
933
934         Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
935
936         // Get the size of the send data
937         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
938         {
939                 Vector<TransactionPart *> *tParts = transaction->getParts();
940                 uint tPartsSize = tParts->size();
941                 for (uint i = 0; i < tPartsSize; i++) {
942                         TransactionPart *part = tParts->get(i);
943                         sendDataSize += part->getSize();
944                 }
945         }
946
947         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
948         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
949                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
950         }
951
952         // Make the send data size
953         Array<char> *sendData = new Array<char>(sendDataSize);
954         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
955
956         // Encode the data
957         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
958         bbEncode->putInt(transaction->getParts()->size());
959         {
960                 Vector<TransactionPart *> *tParts = transaction->getParts();
961                 uint tPartsSize = tParts->size();
962                 for (uint i = 0; i < tPartsSize; i++) {
963                         TransactionPart *part = tParts->get(i);
964                         part->encode(bbEncode);
965                 }
966         }
967
968         // Send by local
969         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
970         localSequenceNumber++;
971
972         if (returnData == NULL) {
973                 // Could not contact server
974                 return Pair<bool, bool>(true, false);
975         }
976
977         // Decode the data
978         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
979         bool didCommit = bbDecode->get() == 1;
980         bool couldArbitrate = bbDecode->get() == 1;
981         int numberOfEntries = bbDecode->getInt();
982         bool foundAbort = false;
983
984         for (int i = 0; i < numberOfEntries; i++) {
985                 char type = bbDecode->get();
986                 if (type == TypeAbort) {
987                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
988
989                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
990                                 foundAbort = true;
991                         }
992
993                         processEntry(abort);
994                 } else if (type == TypeCommitPart) {
995                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
996                         processEntry(commitPart);
997                 }
998         }
999
1000         updateLiveStateFromLocal();
1001
1002         if (couldArbitrate) {
1003                 TransactionStatus *status =  transaction->getTransactionStatus();
1004                 if (didCommit) {
1005                         status->setStatus(TransactionStatus_StatusCommitted);
1006                 } else {
1007                         status->setStatus(TransactionStatus_StatusAborted);
1008                 }
1009         } else {
1010                 TransactionStatus *status =  transaction->getTransactionStatus();
1011                 if (foundAbort) {
1012                         status->setStatus(TransactionStatus_StatusAborted);
1013                 } else {
1014                         status->setStatus(TransactionStatus_StatusCommitted);
1015                 }
1016         }
1017
1018         return Pair<bool, bool>(false, true);
1019 }
1020
1021 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1022         // Decode the data
1023         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1024         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1025         int numberOfParts = bbDecode->getInt();
1026
1027         // If we did commit a transaction or not
1028         bool didCommit = false;
1029         bool couldArbitrate = false;
1030
1031         if (numberOfParts != 0) {
1032
1033                 // decode the transaction
1034                 Transaction *transaction = new Transaction();
1035                 for (int i = 0; i < numberOfParts; i++) {
1036                         bbDecode->get();
1037                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1038                         transaction->addPartDecode(newPart);
1039                 }
1040
1041                 // Arbitrate on transaction and pull relevant return data
1042                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1043                 couldArbitrate = localArbitrateReturn.getFirst();
1044                 didCommit = localArbitrateReturn.getSecond();
1045
1046                 updateLiveStateFromLocal();
1047
1048                 // Transaction was sent to the server so keep track of it to prevent double commit
1049                 if (transaction->getSequenceNumber() != -1) {
1050                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1051                 }
1052         }
1053
1054         // The data to send back
1055         int returnDataSize = 0;
1056         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1057
1058         // Get the aborts to send back
1059         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1060         {
1061                 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1062                 while (abortit->hasNext())
1063                         abortLocalSequenceNumbers->add(abortit->next());
1064                 delete abortit;
1065         }
1066
1067         qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1068
1069         uint asize = abortLocalSequenceNumbers->size();
1070         for (uint i = 0; i < asize; i++) {
1071                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1072                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1073                         continue;
1074                 }
1075
1076                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1077                 unseenArbitrations->add(abort);
1078                 returnDataSize += abort->getSize();
1079         }
1080
1081         // Get the commits to send back
1082         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1083         if (commitForClientTable != NULL) {
1084                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1085                 {
1086                         SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1087                         while (commitit->hasNext())
1088                                 commitLocalSequenceNumbers->add(commitit->next());
1089                         delete commitit;
1090                 }
1091                 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1092
1093                 uint clsSize = commitLocalSequenceNumbers->size();
1094                 for (uint clsi = 0; clsi < clsSize; clsi++) {
1095                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1096                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1097
1098                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1099                                 continue;
1100                         }
1101
1102                         {
1103                                 Vector<CommitPart *> *parts = commit->getParts();
1104                                 uint nParts = parts->size();
1105                                 for (uint i = 0; i < nParts; i++) {
1106                                         CommitPart *commitPart = parts->get(i);
1107                                         unseenArbitrations->add(commitPart);
1108                                         returnDataSize += commitPart->getSize();
1109                                 }
1110                         }
1111                 }
1112         }
1113
1114         // Number of arbitration entries to decode
1115         returnDataSize += 2 * sizeof(int32_t);
1116
1117         // bool of did commit or not
1118         if (numberOfParts != 0) {
1119                 returnDataSize += sizeof(char);
1120         }
1121
1122         // Data to send Back
1123         Array<char> *returnData = new Array<char>(returnDataSize);
1124         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1125
1126         if (numberOfParts != 0) {
1127                 if (didCommit) {
1128                         bbEncode->put((char)1);
1129                 } else {
1130                         bbEncode->put((char)0);
1131                 }
1132                 if (couldArbitrate) {
1133                         bbEncode->put((char)1);
1134                 } else {
1135                         bbEncode->put((char)0);
1136                 }
1137         }
1138
1139         bbEncode->putInt(unseenArbitrations->size());
1140         uint size = unseenArbitrations->size();
1141         for (uint i = 0; i < size; i++) {
1142                 Entry *entry = unseenArbitrations->get(i);
1143                 entry->encode(bbEncode);
1144         }
1145
1146         localSequenceNumber++;
1147         return returnData;
1148 }
1149
1150
1151 /** Method tries to send slot to server.  Returns status in tuple.
1152                 isInserted returns whether last un-acked send (if any) was
1153                 successful.  Returns whether send was confirmed.x
1154  */
1155
1156 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1157         attemptedToSendToServer = true;
1158
1159         *array = cloud->putSlot(slot, newSize);
1160         if (*array == NULL) {
1161                 *array = new Array<Slot *>(1);
1162                 (*array)->set(0, slot);
1163                 rejectedSlotVector->clear();
1164                 *isInserted = false;
1165                 return true;
1166         } else {
1167                 if ((*array)->length() == 0) {
1168                         throw new Error("Server Error: Did not send any slots");
1169                 }
1170
1171                 if (hadPartialSendToServer) {
1172                         *isInserted = false;
1173                         uint size = (*array)->length();
1174                         for (uint i = 0; i < size; i++) {
1175                                 Slot *s = (*array)->get(i);
1176                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1177                                         *isInserted = true;
1178                                         break;
1179                                 }
1180                         }
1181
1182                         //Also need to see if other machines acknowledged our message
1183                         if (!(*isInserted)) {
1184                                 for (uint i = 0; i < size; i++) {
1185                                         Slot *s = (*array)->get(i);
1186                                         
1187                                         // Process each entry in the slot
1188                                         Vector<Entry *> *entries = s->getEntries();
1189                                         uint eSize = entries->size();
1190                                         for (uint ei = 0; ei < eSize; ei++) {
1191                                                 Entry *entry = entries->get(ei);
1192                                                 
1193                                                 if (entry->getType() == TypeLastMessage) {
1194                                                         LastMessage *lastMessage = (LastMessage *)entry;
1195                                                         
1196                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1197                                                                 *isInserted = true;
1198                                                                 goto done;
1199                                                         }
1200                                                 }
1201                                         }
1202                                 }
1203                         }
1204                 done:
1205                         if (!(*isInserted)) {
1206                                 rejectedSlotVector->add(slot->getSequenceNumber());
1207                         }
1208                         
1209                         return false;
1210                 } else {
1211                         rejectedSlotVector->add(slot->getSequenceNumber());
1212                         *isInserted = false;
1213                         return false;
1214                 }
1215         }
1216 }
1217
1218 /**
1219  * Returns false if a resize was needed
1220  */
1221 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1222         int newSize = 0;
1223         if (liveSlotCount > bufferResizeThreshold) {
1224                 resize = true;//Resize is forced
1225         }
1226
1227         if (resize) {
1228                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1229                 TableStatus *status = new TableStatus(slot, newSize);
1230                 slot->addEntry(status);
1231         }
1232
1233         // Fill with rejected slots first before doing anything else
1234         doRejectedMessages(slot);
1235
1236         // Do mandatory rescue of entries
1237         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1238
1239         // Extract working variables
1240         bool needsResize = mandatoryRescueReturn.getFirst();
1241         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1242         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1243
1244         if (needsResize && !resize) {
1245                 // We need to resize but we are not resizing so return false
1246                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1247         }
1248
1249         bool inserted = false;
1250         if (newKeyEntry != NULL) {
1251                 newKeyEntry->setSlot(slot);
1252                 if (slot->hasSpace(newKeyEntry)) {
1253                         slot->addEntry(newKeyEntry);
1254                         inserted = true;
1255                 }
1256         }
1257
1258         // Clear the transactions, aborts and commits that were sent previously
1259         transactionPartsSent->clear();
1260         pendingSendArbitrationEntriesToDelete->clear();
1261         uint size = pendingSendArbitrationRounds->size();
1262         for (uint i = 0; i < size; i++) {
1263                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1264                 bool isFull = false;
1265                 round->generateParts();
1266                 Vector<Entry *> *parts = round->getParts();
1267
1268                 // Insert pending arbitration data
1269                 uint vsize = parts->size();
1270                 for (uint vi = 0; vi < vsize; vi++) {
1271                         Entry *arbitrationData = parts->get(vi);
1272
1273                         // If it is an abort then we need to set some information
1274                         if (arbitrationData->getType() == TypeAbort) {
1275                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1276                         }
1277
1278                         if (!slot->hasSpace(arbitrationData)) {
1279                                 // No space so cant do anything else with these data entries
1280                                 isFull = true;
1281                                 break;
1282                         }
1283
1284                         // Add to this current slot and add it to entries to delete
1285                         slot->addEntry(arbitrationData);
1286                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1287                 }
1288
1289                 if (isFull) {
1290                         break;
1291                 }
1292         }
1293
1294         if (pendingTransactionQueue->size() > 0) {
1295                 Transaction *transaction = pendingTransactionQueue->get(0);
1296                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1297                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1298                         transaction->setSequenceNumber(slot->getSequenceNumber());
1299                 }
1300
1301                 while (true) {
1302                         TransactionPart *part = transaction->getNextPartToSend();
1303                         if (part == NULL) {
1304                                 // Ran out of parts to send for this transaction so move on
1305                                 break;
1306                         }
1307
1308                         if (slot->hasSpace(part)) {
1309                                 slot->addEntry(part);
1310                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1311                                 if (partsSent == NULL) {
1312                                         partsSent = new Vector<int32_t>();
1313                                         transactionPartsSent->put(transaction, partsSent);
1314                                 }
1315                                 partsSent->add(part->getPartNumber());
1316                                 transactionPartsSent->put(transaction, partsSent);
1317                         } else {
1318                                 break;
1319                         }
1320                 }
1321         }
1322
1323         // Fill the remainder of the slot with rescue data
1324         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1325
1326         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1327 }
1328
1329 void Table::doRejectedMessages(Slot *s) {
1330         if (!rejectedSlotVector->isEmpty()) {
1331                 /* TODO: We should avoid generating a rejected message entry if
1332                  * there is already a sufficient entry in the queue (e->g->,
1333                  * equalsto value of true and same sequence number)->  */
1334
1335                 int64_t old_seqn = rejectedSlotVector->get(0);
1336                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1337                         int64_t new_seqn = rejectedSlotVector->lastElement();
1338                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1339                         s->addEntry(rm);
1340                 } else {
1341                         int64_t prev_seqn = -1;
1342                         uint i = 0;
1343                         /* Go through list of missing messages */
1344                         for (; i < rejectedSlotVector->size(); i++) {
1345                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1346                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1347                                 if (s_msg != NULL)
1348                                         break;
1349                                 prev_seqn = curr_seqn;
1350                         }
1351                         /* Generate rejected message entry for missing messages */
1352                         if (prev_seqn != -1) {
1353                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1354                                 s->addEntry(rm);
1355                         }
1356                         /* Generate rejected message entries for present messages */
1357                         for (; i < rejectedSlotVector->size(); i++) {
1358                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1359                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1360                                 int64_t machineid = s_msg->getMachineID();
1361                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1362                                 s->addEntry(rm);
1363                         }
1364                 }
1365         }
1366 }
1367
1368 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1369         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1370         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1371         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1372                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1373         }
1374
1375         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1376         bool seenLiveSlot = false;
1377         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1378         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1379
1380
1381         // Mandatory Rescue
1382         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1383                 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1384                 // Push slot number forward
1385                 if (!seenLiveSlot) {
1386                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1387                 }
1388
1389                 if (!previousSlot->isLive()) {
1390                         continue;
1391                 }
1392
1393                 // We have seen a live slot
1394                 seenLiveSlot = true;
1395
1396                 // Get all the live entries for a slot
1397                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1398
1399                 // Iterate over all the live entries and try to rescue them
1400                 uint lESize = liveEntries->size();
1401                 for (uint i = 0; i < lESize; i++) {
1402                         Entry *liveEntry = liveEntries->get(i);
1403                         if (slot->hasSpace(liveEntry)) {
1404                                 // Enough space to rescue the entry
1405                                 slot->addEntry(liveEntry);
1406                         } else if (currentSequenceNumber == firstIfFull) {
1407                                 //if there's no space but the entry is about to fall off the queue
1408                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1409                         }
1410                 }
1411         }
1412
1413         // Did not resize
1414         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1415 }
1416
1417 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1418         /* now go through live entries from least to greatest sequence number until
1419          * either all live slots added, or the slot doesn't have enough room
1420          * for SKIP_THRESHOLD consecutive entries*/
1421         int skipcount = 0;
1422         int64_t newestseqnum = buffer->getNewestSeqNum();
1423         for (; seqn <= newestseqnum; seqn++) {
1424                 Slot *prevslot = buffer->getSlot(seqn);
1425                 //Push slot number forward
1426                 if (!seenliveslot)
1427                         oldestLiveSlotSequenceNumver = seqn;
1428
1429                 if (!prevslot->isLive())
1430                         continue;
1431                 seenliveslot = true;
1432                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1433                 uint lESize = liveentries->size();
1434                 for (uint i = 0; i < lESize; i++) {
1435                         Entry *liveentry = liveentries->get(i);
1436                         if (s->hasSpace(liveentry))
1437                                 s->addEntry(liveentry);
1438                         else {
1439                                 skipcount++;
1440                                 if (skipcount > Table_SKIP_THRESHOLD)
1441                                         goto donesearch;
1442                         }
1443                 }
1444         }
1445 donesearch:
1446         ;
1447 }
1448
1449 /**
1450  * Checks for malicious activity and updates the local copy of the block chain->
1451  */
1452 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1453         // The cloud communication layer has checked slot HMACs already
1454         // before decoding
1455         if (newSlots->length() == 0) {
1456                 return;
1457         }
1458
1459         // Make sure all slots are newer than the last largest slot this
1460         // client has seen
1461         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1462         if (firstSeqNum <= sequenceNumber) {
1463                 throw new Error("Server Error: Sent older slots!");
1464         }
1465
1466         // Create an object that can access both new slots and slots in our
1467         // local chain without committing slots to our local chain
1468         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1469
1470         // Check that the HMAC chain is not broken
1471         checkHMACChain(indexer, newSlots);
1472
1473         // Set to keep track of messages from clients
1474         Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1475         {
1476                 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1477                 while (lmit->hasNext())
1478                         machineSet->add(lmit->next());
1479                 delete lmit;
1480         }
1481
1482         // Process each slots data
1483         {
1484                 uint numSlots = newSlots->length();
1485                 for (uint i = 0; i < numSlots; i++) {
1486                         Slot *slot = newSlots->get(i);
1487                         processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1488                         updateExpectedSize();
1489                 }
1490         }
1491
1492         // If there is a gap, check to see if the server sent us
1493         // everything->
1494         if (firstSeqNum != (sequenceNumber + 1)) {
1495
1496                 // Check the size of the slots that were sent down by the server->
1497                 // Can only check the size if there was a gap
1498                 checkNumSlots(newSlots->length());
1499
1500                 // Since there was a gap every machine must have pushed a slot or
1501                 // must have a last message message-> If not then the server is
1502                 // hiding slots
1503                 if (!machineSet->isEmpty()) {
1504                         throw new Error("Missing record for machines: ");
1505                 }
1506         }
1507
1508         // Update the size of our local block chain->
1509         commitNewMaxSize();
1510
1511         // Commit new to slots to the local block chain->
1512         {
1513                 uint numSlots = newSlots->length();
1514                 for (uint i = 0; i < numSlots; i++) {
1515                         Slot *slot = newSlots->get(i);
1516
1517                         // Insert this slot into our local block chain copy->
1518                         buffer->putSlot(slot);
1519
1520                         // Keep track of how many slots are currently live (have live data
1521                         // in them)->
1522                         liveSlotCount++;
1523                 }
1524         }
1525         // Get the sequence number of the latest slot in the system
1526         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1527         updateLiveStateFromServer();
1528
1529         // No Need to remember after we pulled from the server
1530         offlineTransactionsCommittedAndAtServer->clear();
1531
1532         // This is invalidated now
1533         hadPartialSendToServer = false;
1534 }
1535
1536 void Table::updateLiveStateFromServer() {
1537         // Process the new transaction parts
1538         processNewTransactionParts();
1539
1540         // Do arbitration on new transactions that were received
1541         arbitrateFromServer();
1542
1543         // Update all the committed keys
1544         bool didCommitOrSpeculate = updateCommittedTable();
1545
1546         // Delete the transactions that are now dead
1547         updateLiveTransactionsAndStatus();
1548
1549         // Do speculations
1550         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1551         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1552 }
1553
1554 void Table::updateLiveStateFromLocal() {
1555         // Update all the committed keys
1556         bool didCommitOrSpeculate = updateCommittedTable();
1557
1558         // Delete the transactions that are now dead
1559         updateLiveTransactionsAndStatus();
1560
1561         // Do speculations
1562         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1563         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1564 }
1565
1566 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1567         int64_t prevslots = firstSequenceNumber;
1568
1569         if (didFindTableStatus) {
1570         } else {
1571                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1572         }
1573
1574         didFindTableStatus = true;
1575         currMaxSize = numberOfSlots;
1576 }
1577
1578 void Table::updateExpectedSize() {
1579         expectedsize++;
1580
1581         if (expectedsize > currMaxSize) {
1582                 expectedsize = currMaxSize;
1583         }
1584 }
1585
1586
1587 /**
1588  * Check the size of the block chain to make sure there are enough
1589  * slots sent back by the server-> This is only called when we have a
1590  * gap between the slots that we have locally and the slots sent by
1591  * the server therefore in the slots sent by the server there will be
1592  * at least 1 Table status message
1593  */
1594 void Table::checkNumSlots(int numberOfSlots) {
1595         if (numberOfSlots != expectedsize) {
1596                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1597         }
1598 }
1599
1600 /**
1601  * Update the size of of the local buffer if it is needed->
1602  */
1603 void Table::commitNewMaxSize() {
1604         didFindTableStatus = false;
1605
1606         // Resize the local slot buffer
1607         if (numberOfSlots != currMaxSize) {
1608                 buffer->resize((int32_t)currMaxSize);
1609         }
1610
1611         // Change the number of local slots to the new size
1612         numberOfSlots = (int32_t)currMaxSize;
1613
1614         // Recalculate the resize threshold since the size of the local
1615         // buffer has changed
1616         setResizeThreshold();
1617 }
1618
1619 /**
1620  * Process the new transaction parts from this latest round of slots
1621  * received from the server
1622  */
1623 void Table::processNewTransactionParts() {
1624
1625         if (newTransactionParts->size() == 0) {
1626                 // Nothing new to process
1627                 return;
1628         }
1629
1630         // Iterate through all the machine Ids that we received new parts
1631         // for
1632         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1633         while (tpit->hasNext()) {
1634                 int64_t machineId = tpit->next();
1635                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1636
1637                 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1638                 // Iterate through all the parts for that machine Id
1639                 while (ptit->hasNext()) {
1640                         Pair<int64_t, int32_t> *partId = ptit->next();
1641                         TransactionPart *part = parts->get(partId);
1642
1643                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1644                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1645                                 if (lastTransactionNumber >= part->getSequenceNumber()) {
1646                                         // Set dead the transaction part
1647                                         part->setDead();
1648                                         continue;
1649                                 }
1650                         }
1651
1652                         // Get the transaction object for that sequence number
1653                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1654
1655                         if (transaction == NULL) {
1656                                 // This is a new transaction that we dont have so make a new one
1657                                 transaction = new Transaction();
1658
1659                                 // Insert this new transaction into the live tables
1660                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1661                                 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1662                         }
1663
1664                         // Add that part to the transaction
1665                         transaction->addPartDecode(part);
1666                 }
1667                 delete ptit;
1668         }
1669         delete tpit;
1670         // Clear all the new transaction parts in preparation for the next
1671         // time the server sends slots
1672         newTransactionParts->clear();
1673 }
1674
1675 void Table::arbitrateFromServer() {
1676
1677         if (liveTransactionBySequenceNumberTable->size() == 0) {
1678                 // Nothing to arbitrate on so move on
1679                 return;
1680         }
1681
1682         // Get the transaction sequence numbers and sort from oldest to newest
1683         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1684         {
1685                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1686                 while (trit->hasNext())
1687                         transactionSequenceNumbers->add(trit->next());
1688                 delete trit;
1689         }
1690         qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1691
1692         // Collection of key value pairs that are
1693         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1694
1695         // The last transaction arbitrated on
1696         int64_t lastTransactionCommitted = -1;
1697         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1698         uint tsnSize = transactionSequenceNumbers->size();
1699         for (uint i = 0; i < tsnSize; i++) {
1700                 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1701                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1702
1703                 // Check if this machine arbitrates for this transaction if not
1704                 // then we cant arbitrate this transaction
1705                 if (transaction->getArbitrator() != localMachineId) {
1706                         continue;
1707                 }
1708
1709                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1710                         continue;
1711                 }
1712
1713                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1714                         // We have seen this already locally so dont commit again
1715                         continue;
1716                 }
1717
1718
1719                 if (!transaction->isComplete()) {
1720                         // Will arbitrate in incorrect order if we continue so just break
1721                         // Most likely this
1722                         break;
1723                 }
1724
1725
1726                 // update the largest transaction seen by arbitrator from server
1727                 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1728                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1729                 } else {
1730                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1731                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1732                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1733                         }
1734                 }
1735
1736                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1737                         // Guard evaluated as true
1738
1739                         // Update the local changes so we can make the commit
1740                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1741                         while (kvit->hasNext()) {
1742                                 KeyValue *kv = kvit->next();
1743                                 speculativeTableTmp->put(kv->getKey(), kv);
1744                         }
1745                         delete kvit;
1746
1747                         // Update what the last transaction committed was for use in batch commit
1748                         lastTransactionCommitted = transactionSequenceNumber;
1749                 } else {
1750                         // Guard evaluated was false so create abort
1751                         // Create the abort
1752                         Abort *newAbort = new Abort(NULL,
1753                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1754                                                                                                                                         transaction->getSequenceNumber(),
1755                                                                                                                                         transaction->getMachineId(),
1756                                                                                                                                         transaction->getArbitrator(),
1757                                                                                                                                         localArbitrationSequenceNumber);
1758                         localArbitrationSequenceNumber++;
1759                         generatedAborts->add(newAbort);
1760
1761                         // Insert the abort so we can process
1762                         processEntry(newAbort);
1763                 }
1764
1765                 lastSeqNumArbOn = transactionSequenceNumber;
1766         }
1767
1768         Commit *newCommit = NULL;
1769
1770         // If there is something to commit
1771         if (speculativeTableTmp->size() != 0) {
1772                 // Create the commit and increment the commit sequence number
1773                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1774                 localArbitrationSequenceNumber++;
1775
1776                 // Add all the new keys to the commit
1777                 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1778                 while (spit->hasNext()) {
1779                         IoTString *string = spit->next();
1780                         KeyValue *kv = speculativeTableTmp->get(string);
1781                         newCommit->addKV(kv);
1782                 }
1783                 delete spit;
1784
1785                 // create the commit parts
1786                 newCommit->createCommitParts();
1787
1788                 // Append all the commit parts to the end of the pending queue
1789                 // waiting for sending to the server
1790                 // Insert the commit so we can process it
1791                 Vector<CommitPart *> *parts = newCommit->getParts();
1792                 uint partsSize = parts->size();
1793                 for (uint i = 0; i < partsSize; i++) {
1794                         CommitPart *commitPart = parts->get(i);
1795                         processEntry(commitPart);
1796                 }
1797         }
1798
1799         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1800                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1801                 pendingSendArbitrationRounds->add(arbitrationRound);
1802
1803                 if (compactArbitrationData()) {
1804                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1805                         if (newArbitrationRound->getCommit() != NULL) {
1806                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1807                                 uint partsSize = parts->size();
1808                                 for (uint i = 0; i < partsSize; i++) {
1809                                         CommitPart *commitPart = parts->get(i);
1810                                         processEntry(commitPart);
1811                                 }
1812                         }
1813                 }
1814         }
1815 }
1816
1817 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1818
1819         // Check if this machine arbitrates for this transaction if not then
1820         // we cant arbitrate this transaction
1821         if (transaction->getArbitrator() != localMachineId) {
1822                 return Pair<bool, bool>(false, false);
1823         }
1824
1825         if (!transaction->isComplete()) {
1826                 // Will arbitrate in incorrect order if we continue so just break
1827                 // Most likely this
1828                 return Pair<bool, bool>(false, false);
1829         }
1830
1831         if (transaction->getMachineId() != localMachineId) {
1832                 // dont do this check for local transactions
1833                 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1834                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1835                                 // We've have already seen this from the server
1836                                 return Pair<bool, bool>(false, false);
1837                         }
1838                 }
1839         }
1840
1841         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1842                 // Guard evaluated as true Create the commit and increment the
1843                 // commit sequence number
1844                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1845                 localArbitrationSequenceNumber++;
1846
1847                 // Update the local changes so we can make the commit
1848                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1849                 while (kvit->hasNext()) {
1850                         KeyValue *kv = kvit->next();
1851                         newCommit->addKV(kv);
1852                 }
1853                 delete kvit;
1854
1855                 // create the commit parts
1856                 newCommit->createCommitParts();
1857
1858                 // Append all the commit parts to the end of the pending queue
1859                 // waiting for sending to the server
1860                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1861                 pendingSendArbitrationRounds->add(arbitrationRound);
1862
1863                 if (compactArbitrationData()) {
1864                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1865                         Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1866                         uint partsSize = parts->size();
1867                         for (uint i = 0; i < partsSize; i++) {
1868                                 CommitPart *commitPart = parts->get(i);
1869                                 processEntry(commitPart);
1870                         }
1871                 } else {
1872                         // Insert the commit so we can process it
1873                         Vector<CommitPart *> *parts = newCommit->getParts();
1874                         uint partsSize = parts->size();
1875                         for (uint i = 0; i < partsSize; i++) {
1876                                 CommitPart *commitPart = parts->get(i);
1877                                 processEntry(commitPart);
1878                         }
1879                 }
1880
1881                 if (transaction->getMachineId() == localMachineId) {
1882                         TransactionStatus *status = transaction->getTransactionStatus();
1883                         if (status != NULL) {
1884                                 status->setStatus(TransactionStatus_StatusCommitted);
1885                         }
1886                 }
1887
1888                 updateLiveStateFromLocal();
1889                 return Pair<bool, bool>(true, true);
1890         } else {
1891                 if (transaction->getMachineId() == localMachineId) {
1892                         // For locally created messages update the status
1893                         // Guard evaluated was false so create abort
1894                         TransactionStatus *status = transaction->getTransactionStatus();
1895                         if (status != NULL) {
1896                                 status->setStatus(TransactionStatus_StatusAborted);
1897                         }
1898                 } else {
1899                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1900
1901                         // Create the abort
1902                         Abort *newAbort = new Abort(NULL,
1903                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1904                                                                                                                                         -1,
1905                                                                                                                                         transaction->getMachineId(),
1906                                                                                                                                         transaction->getArbitrator(),
1907                                                                                                                                         localArbitrationSequenceNumber);
1908                         localArbitrationSequenceNumber++;
1909                         addAbortSet->add(newAbort);
1910
1911                         // Append all the commit parts to the end of the pending queue
1912                         // waiting for sending to the server
1913                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1914                         pendingSendArbitrationRounds->add(arbitrationRound);
1915
1916                         if (compactArbitrationData()) {
1917                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1918
1919                                 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1920                                 uint partsSize = parts->size();
1921                                 for (uint i = 0; i < partsSize; i++) {
1922                                         CommitPart *commitPart = parts->get(i);
1923                                         processEntry(commitPart);
1924                                 }
1925                         }
1926                 }
1927
1928                 updateLiveStateFromLocal();
1929                 return Pair<bool, bool>(true, false);
1930         }
1931 }
1932
1933 /**
1934  * Compacts the arbitration data my merging commits and aggregating
1935  * aborts so that a single large push of commits can be done instead
1936  * of many small updates
1937  */
1938 bool Table::compactArbitrationData() {
1939         if (pendingSendArbitrationRounds->size() < 2) {
1940                 // Nothing to compact so do nothing
1941                 return false;
1942         }
1943
1944         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1945         if (lastRound->getDidSendPart()) {
1946                 return false;
1947         }
1948
1949         bool hadCommit = (lastRound->getCommit() == NULL);
1950         bool gotNewCommit = false;
1951
1952         uint numberToDelete = 1;
1953         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1954                 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1955
1956                 if (round->isFull() || round->getDidSendPart()) {
1957                         // Stop since there is a part that cannot be compacted and we
1958                         // need to compact in order
1959                         break;
1960                 }
1961
1962                 if (round->getCommit() == NULL) {
1963                         // Try compacting aborts only
1964                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1965                         if (newSize > ArbitrationRound_MAX_PARTS) {
1966                                 // Cant compact since it would be too large
1967                                 break;
1968                         }
1969                         lastRound->addAborts(round->getAborts());
1970                 } else {
1971                         // Create a new larger commit
1972                         Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1973                         localArbitrationSequenceNumber++;
1974
1975                         // Create the commit parts so that we can count them
1976                         newCommit->createCommitParts();
1977
1978                         // Calculate the new size of the parts
1979                         int newSize = newCommit->getNumberOfParts();
1980                         newSize += lastRound->getAbortsCount();
1981                         newSize += round->getAbortsCount();
1982
1983                         if (newSize > ArbitrationRound_MAX_PARTS) {
1984                                 // Cant compact since it would be too large
1985                                 break;
1986                         }
1987
1988                         // Set the new compacted part
1989                         lastRound->setCommit(newCommit);
1990                         lastRound->addAborts(round->getAborts());
1991                         gotNewCommit = true;
1992                 }
1993
1994                 numberToDelete++;
1995         }
1996
1997         if (numberToDelete != 1) {
1998                 // If there is a compaction
1999                 // Delete the previous pieces that are now in the new compacted piece
2000                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2001                         pendingSendArbitrationRounds->clear();
2002                 } else {
2003                         for (uint i = 0; i < numberToDelete; i++) {
2004                                 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2005                         }
2006                 }
2007
2008                 // Add the new compacted into the pending to send list
2009                 pendingSendArbitrationRounds->add(lastRound);
2010
2011                 // Should reinsert into the commit processor
2012                 if (hadCommit && gotNewCommit) {
2013                         return true;
2014                 }
2015         }
2016
2017         return false;
2018 }
2019
2020 /**
2021  * Update all the commits and the committed tables, sets dead the dead
2022  * transactions
2023  */
2024 bool Table::updateCommittedTable() {
2025
2026         if (newCommitParts->size() == 0) {
2027                 // Nothing new to process
2028                 return false;
2029         }
2030
2031         // Iterate through all the machine Ids that we received new parts for
2032         SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2033         while (partsit->hasNext()) {
2034                 int64_t machineId = partsit->next();
2035                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2036
2037                 // Iterate through all the parts for that machine Id
2038                 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2039                 while (pairit->hasNext()) {
2040                         Pair<int64_t, int32_t> *partId = pairit->next();
2041                         CommitPart *part = parts->get(partId);
2042
2043                         // Get the transaction object for that sequence number
2044                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2045
2046                         if (commitForClientTable == NULL) {
2047                                 // This is the first commit from this device
2048                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
2049                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2050                         }
2051
2052                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2053
2054                         if (commit == NULL) {
2055                                 // This is a new commit that we dont have so make a new one
2056                                 commit = new Commit();
2057
2058                                 // Insert this new commit into the live tables
2059                                 commitForClientTable->put(part->getSequenceNumber(), commit);
2060                         }
2061
2062                         // Add that part to the commit
2063                         commit->addPartDecode(part);
2064                 }
2065                 delete pairit;
2066         }
2067         delete partsit;
2068
2069         // Clear all the new commits parts in preparation for the next time
2070         // the server sends slots
2071         newCommitParts->clear();
2072
2073         // If we process a new commit keep track of it for future use
2074         bool didProcessANewCommit = false;
2075
2076         // Process the commits one by one
2077         SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2078         while (liveit->hasNext()) {
2079                 int64_t arbitratorId = liveit->next();
2080
2081                 // Get all the commits for a specific arbitrator
2082                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2083
2084                 // Sort the commits in order
2085                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2086                 {
2087                         SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2088                         while (clientit->hasNext())
2089                                 commitSequenceNumbers->add(clientit->next());
2090                         delete clientit;
2091                 }
2092
2093                 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2094
2095                 // Get the last commit seen from this arbitrator
2096                 int64_t lastCommitSeenSequenceNumber = -1;
2097                 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2098                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2099                 }
2100
2101                 // Go through each new commit one by one
2102                 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2103                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2104                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
2105
2106                         // Special processing if a commit is not complete
2107                         if (!commit->isComplete()) {
2108                                 if (i == (commitSequenceNumbers->size() - 1)) {
2109                                         // If there is an incomplete commit and this commit is the
2110                                         // latest one seen then this commit cannot be processed and
2111                                         // there are no other commits
2112                                         break;
2113                                 } else {
2114                                         // This is a commit that was already dead but parts of it
2115                                         // are still in the block chain (not flushed out yet)->
2116                                         // Delete it and move on
2117                                         commit->setDead();
2118                                         commitForClientTable->remove(commit->getSequenceNumber());
2119                                         continue;
2120                                 }
2121                         }
2122
2123                         // Update the last transaction that was updated if we can
2124                         if (commit->getTransactionSequenceNumber() != -1) {
2125                                 // Update the last transaction sequence number that the arbitrator arbitrated on1
2126                                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2127                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2128                                 }
2129                         }
2130
2131                         // Update the last arbitration data that we have seen so far
2132                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2133                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2134                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2135                                         // Is larger
2136                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2137                                 }
2138                         } else {
2139                                 // Never seen any data from this arbitrator so record the first one
2140                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2141                         }
2142
2143                         // We have already seen this commit before so need to do the
2144                         // full processing on this commit
2145                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2146
2147                                 // Update the last transaction that was updated if we can
2148                                 if (commit->getTransactionSequenceNumber() != -1) {
2149                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2150                                         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2151                                                         lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2152                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2153                                         }
2154                                 }
2155
2156                                 continue;
2157                         }
2158
2159                         // If we got here then this is a brand new commit and needs full
2160                         // processing
2161                         // Get what commits should be edited, these are the commits that
2162                         // have live values for their keys
2163                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2164                         {
2165                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2166                                 while (kvit->hasNext()) {
2167                                         KeyValue *kv = kvit->next();
2168                                         Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2169                                         if (commit != NULL)
2170                                                 commitsToEdit->add(commit);
2171                                 }
2172                                 delete kvit;
2173                         }
2174
2175                         // Update each previous commit that needs to be updated
2176                         SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2177                         while (commitit->hasNext()) {
2178                                 Commit *previousCommit = commitit->next();
2179
2180                                 // Only bother with live commits (TODO: Maybe remove this check)
2181                                 if (previousCommit->isLive()) {
2182
2183                                         // Update which keys in the old commits are still live
2184                                         {
2185                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2186                                                 while (kvit->hasNext()) {
2187                                                         KeyValue *kv = kvit->next();
2188                                                         previousCommit->invalidateKey(kv->getKey());
2189                                                 }
2190                                                 delete kvit;
2191                                         }
2192
2193                                         // if the commit is now dead then remove it
2194                                         if (!previousCommit->isLive()) {
2195                                                 commitForClientTable->remove(previousCommit->getSequenceNumber());
2196                                         }
2197                                 }
2198                         }
2199                         delete commitit;
2200
2201                         // Update the last seen sequence number from this arbitrator
2202                         if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2203                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2204                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2205                                 }
2206                         } else {
2207                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2208                         }
2209
2210                         // We processed a new commit that we havent seen before
2211                         didProcessANewCommit = true;
2212
2213                         // Update the committed table of keys and which commit is using which key
2214                         {
2215                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2216                                 while (kvit->hasNext()) {
2217                                         KeyValue *kv = kvit->next();
2218                                         committedKeyValueTable->put(kv->getKey(), kv);
2219                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2220                                 }
2221                                 delete kvit;
2222                         }
2223                 }
2224         }
2225         delete liveit;
2226
2227         return didProcessANewCommit;
2228 }
2229
2230 /**
2231  * Create the speculative table from transactions that are still live
2232  * and have come from the cloud
2233  */
2234 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2235         if (liveTransactionBySequenceNumberTable->size() == 0) {
2236                 // There is nothing to speculate on
2237                 return false;
2238         }
2239
2240         // Create a list of the transaction sequence numbers and sort them
2241         // from oldest to newest
2242         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2243         {
2244                 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2245                 while (trit->hasNext())
2246                         transactionSequenceNumbersSorted->add(trit->next());
2247                 delete trit;
2248         }
2249
2250         qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2251
2252         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2253
2254
2255         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2256                 // If there is a gap in the transaction sequence numbers then
2257                 // there was a commit or an abort of a transaction OR there was a
2258                 // new commit (Could be from offline commit) so a redo the
2259                 // speculation from scratch
2260
2261                 // Start from scratch
2262                 speculatedKeyValueTable->clear();
2263                 lastTransactionSequenceNumberSpeculatedOn = -1;
2264                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2265         }
2266
2267         // Remember the front of the transaction list
2268         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2269
2270         // Find where to start arbitration from
2271         uint startIndex = 0;
2272
2273         for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2274                 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2275                         break;
2276         startIndex++;
2277
2278         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2279                 // Make sure we are not out of bounds
2280                 return false;           // did not speculate
2281         }
2282
2283         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2284         bool didSkip = true;
2285
2286         for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2287                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2288                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2289
2290                 if (!transaction->isComplete()) {
2291                         // If there is an incomplete transaction then there is nothing
2292                         // we can do add this transactions arbitrator to the list of
2293                         // arbitrators we should ignore
2294                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2295                         didSkip = true;
2296                         continue;
2297                 }
2298
2299                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2300                         continue;
2301                 }
2302
2303                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2304
2305                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2306                         // Guard evaluated to true so update the speculative table
2307                         {
2308                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2309                                 while (kvit->hasNext()) {
2310                                         KeyValue *kv = kvit->next();
2311                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2312                                 }
2313                                 delete kvit;
2314                         }
2315                 }
2316         }
2317
2318         if (didSkip) {
2319                 // Since there was a skip we need to redo the speculation next time around
2320                 lastTransactionSequenceNumberSpeculatedOn = -1;
2321                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2322         }
2323
2324         // We did some speculation
2325         return true;
2326 }
2327
2328 /**
2329  * Create the pending transaction speculative table from transactions
2330  * that are still in the pending transaction buffer
2331  */
2332 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2333         if (pendingTransactionQueue->size() == 0) {
2334                 // There is nothing to speculate on
2335                 return;
2336         }
2337
2338         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2339                 // need to reset on the pending speculation
2340                 lastPendingTransactionSpeculatedOn = NULL;
2341                 firstPendingTransaction = pendingTransactionQueue->get(0);
2342                 pendingTransactionSpeculatedKeyValueTable->clear();
2343         }
2344
2345         // Find where to start arbitration from
2346         uint startIndex = 0;
2347
2348         for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2349                 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2350                         break;
2351
2352         if (startIndex >= pendingTransactionQueue->size()) {
2353                 // Make sure we are not out of bounds
2354                 return;
2355         }
2356
2357         for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2358                 Transaction *transaction = pendingTransactionQueue->get(i);
2359
2360                 lastPendingTransactionSpeculatedOn = transaction;
2361
2362                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2363                         // Guard evaluated to true so update the speculative table
2364                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2365                         while (kvit->hasNext()) {
2366                                 KeyValue *kv = kvit->next();
2367                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2368                         }
2369                         delete kvit;
2370                 }
2371         }
2372 }
2373
2374 /**
2375  * Set dead and remove from the live transaction tables the
2376  * transactions that are dead
2377  */
2378 void Table::updateLiveTransactionsAndStatus() {
2379         // Go through each of the transactions
2380         {
2381                 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2382                 while (iter->hasNext()) {
2383                         int64_t key = iter->next();
2384                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2385
2386                         // Check if the transaction is dead
2387                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2388                                 // Set dead the transaction
2389                                 transaction->setDead();
2390
2391                                 // Remove the transaction from the live table
2392                                 iter->remove();
2393                                 liveTransactionByTransactionIdTable->remove(transaction->getId());
2394                         }
2395                 }
2396                 delete iter;
2397         }
2398
2399         // Go through each of the transactions
2400         {
2401                 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2402                 while (iter->hasNext()) {
2403                         int64_t key = iter->next();
2404                         TransactionStatus *status = outstandingTransactionStatus->get(key);
2405
2406                         // Check if the transaction is dead
2407                         if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2408
2409                                 // Set committed
2410                                 status->setStatus(TransactionStatus_StatusCommitted);
2411
2412                                 // Remove
2413                                 iter->remove();
2414                         }
2415                 }
2416                 delete iter;
2417         }
2418 }
2419
2420 /**
2421  * Process this slot, entry by entry->  Also update the latest message sent by slot
2422  */
2423 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2424
2425         // Update the last message seen
2426         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2427
2428         // Process each entry in the slot
2429         Vector<Entry *> *entries = slot->getEntries();
2430         uint eSize = entries->size();
2431         for (uint ei = 0; ei < eSize; ei++) {
2432                 Entry *entry = entries->get(ei);
2433                 switch (entry->getType()) {
2434                 case TypeCommitPart:
2435                         processEntry((CommitPart *)entry);
2436                         break;
2437                 case TypeAbort:
2438                         processEntry((Abort *)entry);
2439                         break;
2440                 case TypeTransactionPart:
2441                         processEntry((TransactionPart *)entry);
2442                         break;
2443                 case TypeNewKey:
2444                         processEntry((NewKey *)entry);
2445                         break;
2446                 case TypeLastMessage:
2447                         processEntry((LastMessage *)entry, machineSet);
2448                         break;
2449                 case TypeRejectedMessage:
2450                         processEntry((RejectedMessage *)entry, indexer);
2451                         break;
2452                 case TypeTableStatus:
2453                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2454                         break;
2455                 default:
2456                         throw new Error("Unrecognized type: ");
2457                 }
2458         }
2459 }
2460
2461 /**
2462  * Update the last message that was sent for a machine Id
2463  */
2464 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2465         // Update what the last message received by a machine was
2466         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2467 }
2468
2469 /**
2470  * Add the new key to the arbitrators table and update the set of live
2471  * new keys (in case of a rescued new key message)
2472  */
2473 void Table::processEntry(NewKey *entry) {
2474         // Update the arbitrator table with the new key information
2475         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2476
2477         // Update what the latest live new key is
2478         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2479         if (oldNewKey != NULL) {
2480                 // Delete the old new key messages
2481                 oldNewKey->setDead();
2482         }
2483 }
2484
2485 /**
2486  * Process new table status entries and set dead the old ones as new
2487  * ones come in-> keeps track of the largest and smallest table status
2488  * seen in this current round of updating the local copy of the block
2489  * chain
2490  */
2491 void Table::processEntry(TableStatus *entry, int64_t seq) {
2492         int newNumSlots = entry->getMaxSlots();
2493         updateCurrMaxSize(newNumSlots);
2494         initExpectedSize(seq, newNumSlots);
2495
2496         if (liveTableStatus != NULL) {
2497                 // We have a larger table status so the old table status is no
2498                 // int64_ter alive
2499                 liveTableStatus->setDead();
2500         }
2501
2502         // Make this new table status the latest alive table status
2503         liveTableStatus = entry;
2504 }
2505
2506 /**
2507  * Check old messages to see if there is a block chain violation->
2508  * Also
2509  */
2510 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2511         int64_t oldSeqNum = entry->getOldSeqNum();
2512         int64_t newSeqNum = entry->getNewSeqNum();
2513         bool isequal = entry->getEqual();
2514         int64_t machineId = entry->getMachineID();
2515         int64_t seq = entry->getSequenceNumber();
2516
2517         // Check if we have messages that were supposed to be rejected in
2518         // our local block chain
2519         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2520                 // Get the slot
2521                 Slot *slot = indexer->getSlot(seqNum);
2522
2523                 if (slot != NULL) {
2524                         // If we have this slot make sure that it was not supposed to be
2525                         // a rejected slot
2526                         int64_t slotMachineId = slot->getMachineID();
2527                         if (isequal != (slotMachineId == machineId)) {
2528                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2529                         }
2530                 }
2531         }
2532
2533         // Create a list of clients to watch until they see this rejected
2534         // message entry->
2535         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2536         SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2537         while (iter->hasNext()) {
2538                 // Machine ID for the last message entry
2539                 int64_t lastMessageEntryMachineId = iter->next();
2540
2541                 // We've seen it, don't need to continue to watch->  Our next
2542                 // message will implicitly acknowledge it->
2543                 if (lastMessageEntryMachineId == localMachineId) {
2544                         continue;
2545                 }
2546
2547                 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2548                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2549
2550                 if (entrySequenceNumber < seq) {
2551                         // Add this rejected message to the set of messages that this
2552                         // machine ID did not see yet
2553                         addWatchVector(lastMessageEntryMachineId, entry);
2554                         // This client did not see this rejected message yet so add it
2555                         // to the watch set to monitor
2556                         deviceWatchSet->add(lastMessageEntryMachineId);
2557                 }
2558         }
2559         delete iter;
2560
2561         if (deviceWatchSet->isEmpty()) {
2562                 // This rejected message has been seen by all the clients so
2563                 entry->setDead();
2564         } else {
2565                 // We need to watch this rejected message
2566                 entry->setWatchSet(deviceWatchSet);
2567         }
2568 }
2569
2570 /**
2571  * Check if this abort is live, if not then save it so we can kill it
2572  * later-> update the last transaction number that was arbitrated on->
2573  */
2574 void Table::processEntry(Abort *entry) {
2575         if (entry->getTransactionSequenceNumber() != -1) {
2576                 // update the transaction status if it was sent to the server
2577                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2578                 if (status != NULL) {
2579                         status->setStatus(TransactionStatus_StatusAborted);
2580                 }
2581         }
2582
2583         // Abort has not been seen by the client it is for yet so we need to
2584         // keep track of it
2585
2586         Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2587         if (previouslySeenAbort != NULL) {
2588                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2589         }
2590
2591         if (entry->getTransactionArbitrator() == localMachineId) {
2592                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2593         }
2594
2595         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2596                 // The machine already saw this so it is dead
2597                 entry->setDead();
2598                 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2599                 liveAbortTable->remove(&abortid);
2600
2601                 if (entry->getTransactionArbitrator() == localMachineId) {
2602                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2603                 }
2604                 return;
2605         }
2606
2607         // Update the last arbitration data that we have seen so far
2608         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2609                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2610                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2611                         // Is larger
2612                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2613                 }
2614         } else {
2615                 // Never seen any data from this arbitrator so record the first one
2616                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2617         }
2618
2619         // Set dead a transaction if we can
2620         Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2621
2622         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2623         if (transactionToSetDead != NULL) {
2624                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2625         }
2626
2627         // Update the last transaction sequence number that the arbitrator
2628         // arbitrated on
2629         if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2630                         (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2631                 // Is a valid one
2632                 if (entry->getTransactionSequenceNumber() != -1) {
2633                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2634                 }
2635         }
2636 }
2637
2638 /**
2639  * Set dead the transaction part if that transaction is dead and keep
2640  * track of all new parts
2641  */
2642 void Table::processEntry(TransactionPart *entry) {
2643         // Check if we have already seen this transaction and set it dead OR
2644         // if it is not alive
2645         if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2646                 // This transaction is dead, it was already committed or aborted
2647                 entry->setDead();
2648                 return;
2649         }
2650
2651         // This part is still alive
2652         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2653
2654         if (transactionPart == NULL) {
2655                 // Dont have a table for this machine Id yet so make one
2656                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2657                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2658         }
2659
2660         // Update the part and set dead ones we have already seen (got a
2661         // rescued version)
2662         TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2663         if (previouslySeenPart != NULL) {
2664                 previouslySeenPart->setDead();
2665         }
2666 }
2667
2668 /**
2669  * Process new commit entries and save them for future use->  Delete duplicates
2670  */
2671 void Table::processEntry(CommitPart *entry) {
2672         // Update the last transaction that was updated if we can
2673         if (entry->getTransactionSequenceNumber() != -1) {
2674                 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2675                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2676                 }
2677         }
2678
2679         Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2680         if (commitPart == NULL) {
2681                 // Don't have a table for this machine Id yet so make one
2682                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2683                 newCommitParts->put(entry->getMachineId(), commitPart);
2684         }
2685         // Update the part and set dead ones we have already seen (got a
2686         // rescued version)
2687         CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2688         if (previouslySeenPart != NULL) {
2689                 previouslySeenPart->setDead();
2690         }
2691 }
2692
2693 /**
2694  * Update the last message seen table-> Update and set dead the
2695  * appropriate RejectedMessages as clients see them-> Updates the live
2696  * aborts, removes those that are dead and sets them dead-> Check that
2697  * the last message seen is correct and that there is no mismatch of
2698  * our own last message or that other clients have not had a rollback
2699  * on the last message->
2700  */
2701 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2702         // We have seen this machine ID
2703         machineSet->remove(machineId);
2704
2705         // Get the set of rejected messages that this machine Id is has not seen yet
2706         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2707         // If there is a rejected message that this machine Id has not seen yet
2708         if (watchset != NULL) {
2709                 // Go through each rejected message that this machine Id has not
2710                 // seen yet
2711
2712                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2713                 while (rmit->hasNext()) {
2714                         RejectedMessage *rm = rmit->next();
2715                         // If this machine Id has seen this rejected message->->->
2716                         if (rm->getSequenceNumber() <= seqNum) {
2717                                 // Remove it from our watchlist
2718                                 rmit->remove();
2719                                 // Decrement machines that need to see this notification
2720                                 rm->removeWatcher(machineId);
2721                         }
2722                 }
2723                 delete rmit;
2724         }
2725
2726         // Set dead the abort
2727         SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2728
2729         while (abortit->hasNext()) {
2730                 Pair<int64_t, int64_t> *key = abortit->next();
2731                 Abort *abort = liveAbortTable->get(key);
2732                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2733                         abort->setDead();
2734                         abortit->remove();
2735                         if (abort->getTransactionArbitrator() == localMachineId) {
2736                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2737                         }
2738                 }
2739         }
2740         delete abortit;
2741         if (machineId == localMachineId) {
2742                 // Our own messages are immediately dead->
2743                 char livenessType = liveness->getType();
2744                 if (livenessType == TypeLastMessage) {
2745                         ((LastMessage *)liveness)->setDead();
2746                 } else if (livenessType == TypeSlot) {
2747                         ((Slot *)liveness)->setDead();
2748                 } else {
2749                         throw new Error("Unrecognized type");
2750                 }
2751         }
2752         // Get the old last message for this device
2753         Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2754         if (lastMessageEntry == NULL) {
2755                 // If no last message then there is nothing else to process
2756                 return;
2757         }
2758
2759         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2760         Liveness *lastEntry = lastMessageEntry->getSecond();
2761         delete lastMessageEntry;
2762
2763         // If it is not our machine Id since we already set ours to dead
2764         if (machineId != localMachineId) {
2765                 char lastEntryType = lastEntry->getType();
2766
2767                 if (lastEntryType == TypeLastMessage) {
2768                         ((LastMessage *)lastEntry)->setDead();
2769                 } else if (lastEntryType == TypeSlot) {
2770                         ((Slot *)lastEntry)->setDead();
2771                 } else {
2772                         throw new Error("Unrecognized type");
2773                 }
2774         }
2775         // Make sure the server is not playing any games
2776         if (machineId == localMachineId) {
2777                 if (hadPartialSendToServer) {
2778                         // We were not making any updates and we had a machine mismatch
2779                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2780                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2781                         }
2782                 } else {
2783                         // We were not making any updates and we had a machine mismatch
2784                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2785                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2786                         }
2787                 }
2788         } else {
2789                 if (lastMessageSeqNum > seqNum) {
2790                         throw new Error("Server Error: Rollback on remote machine sequence number");
2791                 }
2792         }
2793 }
2794
2795 /**
2796  * Add a rejected message entry to the watch set to keep track of
2797  * which clients have seen that rejected message entry and which have
2798  * not.
2799  */
2800 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2801         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2802         if (entries == NULL) {
2803                 // There is no set for this machine ID yet so create one
2804                 entries = new Hashset<RejectedMessage *>();
2805                 rejectedMessageWatchVectorTable->put(machineId, entries);
2806         }
2807         entries->add(entry);
2808 }
2809
2810 /**
2811  * Check if the HMAC chain is not violated
2812  */
2813 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2814         for (uint i = 0; i < newSlots->length(); i++) {
2815                 Slot *currSlot = newSlots->get(i);
2816                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2817                 if (prevSlot != NULL &&
2818                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2819                         throw new Error("Server Error: Invalid HMAC Chain");
2820         }
2821 }