More edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22
23 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
24         buffer(NULL),
25         cloud(new CloudComm(this, baseurl, password, listeningPort)),
26         random(NULL),
27         liveTableStatus(NULL),
28         pendingTransactionBuilder(NULL),
29         lastPendingTransactionSpeculatedOn(NULL),
30         firstPendingTransaction(NULL),
31         numberOfSlots(0),
32         bufferResizeThreshold(0),
33         liveSlotCount(0),
34         oldestLiveSlotSequenceNumver(1),
35         localMachineId(_localMachineId),
36         sequenceNumber(0),
37         localTransactionSequenceNumber(0),
38         lastTransactionSequenceNumberSpeculatedOn(0),
39         oldestTransactionSequenceNumberSpeculatedOn(0),
40         localArbitrationSequenceNumber(0),
41         hadPartialSendToServer(false),
42         attemptedToSendToServer(false),
43         expectedsize(0),
44         didFindTableStatus(false),
45         currMaxSize(0),
46         lastSlotAttemptedToSend(NULL),
47         lastIsNewKey(false),
48         lastNewSize(0),
49         lastTransactionPartsSent(NULL),
50         lastPendingSendArbitrationEntriesToDelete(NULL),
51         lastNewKey(NULL),
52         committedKeyValueTable(NULL),
53         speculatedKeyValueTable(NULL),
54         pendingTransactionSpeculatedKeyValueTable(NULL),
55         liveNewKeyTable(NULL),
56         lastMessageTable(NULL),
57         rejectedMessageWatchVectorTable(NULL),
58         arbitratorTable(NULL),
59         liveAbortTable(NULL),
60         newTransactionParts(NULL),
61         newCommitParts(NULL),
62         lastArbitratedTransactionNumberByArbitratorTable(NULL),
63         liveTransactionBySequenceNumberTable(NULL),
64         liveTransactionByTransactionIdTable(NULL),
65         liveCommitsTable(NULL),
66         liveCommitsByKeyTable(NULL),
67         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
68         rejectedSlotVector(NULL),
69         pendingTransactionQueue(NULL),
70         pendingSendArbitrationRounds(NULL),
71         pendingSendArbitrationEntriesToDelete(NULL),
72         transactionPartsSent(NULL),
73         outstandingTransactionStatus(NULL),
74         liveAbortsGeneratedByLocal(NULL),
75         offlineTransactionsCommittedAndAtServer(NULL),
76         localCommunicationTable(NULL),
77         lastTransactionSeenFromMachineFromServer(NULL),
78         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
79         lastInsertedNewKey(false),
80         lastSeqNumArbOn(0)
81 {
82         init();
83 }
84
85 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
86         buffer(NULL),
87         cloud(_cloud),
88         random(NULL),
89         liveTableStatus(NULL),
90         pendingTransactionBuilder(NULL),
91         lastPendingTransactionSpeculatedOn(NULL),
92         firstPendingTransaction(NULL),
93         numberOfSlots(0),
94         bufferResizeThreshold(0),
95         liveSlotCount(0),
96         oldestLiveSlotSequenceNumver(1),
97         localMachineId(_localMachineId),
98         sequenceNumber(0),
99         localTransactionSequenceNumber(0),
100         lastTransactionSequenceNumberSpeculatedOn(0),
101         oldestTransactionSequenceNumberSpeculatedOn(0),
102         localArbitrationSequenceNumber(0),
103         hadPartialSendToServer(false),
104         attemptedToSendToServer(false),
105         expectedsize(0),
106         didFindTableStatus(false),
107         currMaxSize(0),
108         lastSlotAttemptedToSend(NULL),
109         lastIsNewKey(false),
110         lastNewSize(0),
111         lastTransactionPartsSent(NULL),
112         lastPendingSendArbitrationEntriesToDelete(NULL),
113         lastNewKey(NULL),
114         committedKeyValueTable(NULL),
115         speculatedKeyValueTable(NULL),
116         pendingTransactionSpeculatedKeyValueTable(NULL),
117         liveNewKeyTable(NULL),
118         lastMessageTable(NULL),
119         rejectedMessageWatchVectorTable(NULL),
120         arbitratorTable(NULL),
121         liveAbortTable(NULL),
122         newTransactionParts(NULL),
123         newCommitParts(NULL),
124         lastArbitratedTransactionNumberByArbitratorTable(NULL),
125         liveTransactionBySequenceNumberTable(NULL),
126         liveTransactionByTransactionIdTable(NULL),
127         liveCommitsTable(NULL),
128         liveCommitsByKeyTable(NULL),
129         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
130         rejectedSlotVector(NULL),
131         pendingTransactionQueue(NULL),
132         pendingSendArbitrationRounds(NULL),
133         pendingSendArbitrationEntriesToDelete(NULL),
134         transactionPartsSent(NULL),
135         outstandingTransactionStatus(NULL),
136         liveAbortsGeneratedByLocal(NULL),
137         offlineTransactionsCommittedAndAtServer(NULL),
138         localCommunicationTable(NULL),
139         lastTransactionSeenFromMachineFromServer(NULL),
140         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
141         lastInsertedNewKey(false),
142         lastSeqNumArbOn(0)
143 {
144         init();
145 }
146
147 /**
148  * Init all the stuff needed for for table usage
149  */
150 void Table::init() {
151         // Init helper objects
152         random = new Random();
153         buffer = new SlotBuffer();
154
155         // init data structs
156         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
157         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
158         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
159         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
160         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
161         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
162         arbitratorTable = new Hashtable<IoTString *, int64_t>();
163         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
164         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
165         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
166         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
167         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
168         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
169         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
170         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
171         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
172         rejectedSlotVector = new Vector<int64_t>();
173         pendingTransactionQueue = new Vector<Transaction *>();
174         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
175         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
176         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
177         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
178         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
179         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
180         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
181         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
182         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
183
184         // Other init stuff
185         numberOfSlots = buffer->capacity();
186         setResizeThreshold();
187 }
188
189 /**
190  * Initialize the table by inserting a table status as the first entry
191  * into the table status also initialize the crypto stuff.
192  */
193 void Table::initTable() {
194         cloud->initSecurity();
195
196         // Create the first insertion into the block chain which is the table status
197         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
198         localSequenceNumber++;
199         TableStatus *status = new TableStatus(s, numberOfSlots);
200         s->addEntry(status);
201         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
202
203         if (array == NULL) {
204                 array = new Array<Slot *>(1);
205                 array->set(0, s);
206                 // update local block chain
207                 validateAndUpdate(array, true);
208         } else if (array->length() == 1) {
209                 // in case we did push the slot BUT we failed to init it
210                 validateAndUpdate(array, true);
211         } else {
212                 throw new Error("Error on initialization");
213         }
214 }
215
216 /**
217  * Rebuild the table from scratch by pulling the latest block chain
218  * from the server.
219  */
220 void Table::rebuild() {
221         // Just pull the latest slots from the server
222         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
223         validateAndUpdate(newslots, true);
224         sendToServer(NULL);
225         updateLiveTransactionsAndStatus();
226 }
227
228 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
229         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
230 }
231
232 int64_t Table::getArbitrator(IoTString *key) {
233         return arbitratorTable->get(key);
234 }
235
236 void Table::close() {
237         cloud->close();
238 }
239
240 IoTString *Table::getCommitted(IoTString *key)  {
241         KeyValue *kv = committedKeyValueTable->get(key);
242
243         if (kv != NULL) {
244                 return kv->getValue();
245         } else {
246                 return NULL;
247         }
248 }
249
250 IoTString *Table::getSpeculative(IoTString *key) {
251         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
252
253         if (kv == NULL) {
254                 kv = speculatedKeyValueTable->get(key);
255         }
256
257         if (kv == NULL) {
258                 kv = committedKeyValueTable->get(key);
259         }
260
261         if (kv != NULL) {
262                 return kv->getValue();
263         } else {
264                 return NULL;
265         }
266 }
267
268 IoTString *Table::getCommittedAtomic(IoTString *key) {
269         KeyValue *kv = committedKeyValueTable->get(key);
270
271         if (!arbitratorTable->contains(key)) {
272                 throw new Error("Key not Found.");
273         }
274
275         // Make sure new key value pair matches the current arbitrator
276         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
277                 // TODO: Maybe not throw en error
278                 throw new Error("Not all Key Values Match Arbitrator.");
279         }
280
281         if (kv != NULL) {
282                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
283                 return kv->getValue();
284         } else {
285                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
286                 return NULL;
287         }
288 }
289
290 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
291         if (!arbitratorTable->contains(key)) {
292                 throw new Error("Key not Found.");
293         }
294
295         // Make sure new key value pair matches the current arbitrator
296         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
297                 // TODO: Maybe not throw en error
298                 throw new Error("Not all Key Values Match Arbitrator.");
299         }
300
301         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
302
303         if (kv == NULL) {
304                 kv = speculatedKeyValueTable->get(key);
305         }
306
307         if (kv == NULL) {
308                 kv = committedKeyValueTable->get(key);
309         }
310
311         if (kv != NULL) {
312                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
313                 return kv->getValue();
314         } else {
315                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
316                 return NULL;
317         }
318 }
319
320 bool Table::update()  {
321         try {
322                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
323                 validateAndUpdate(newSlots, false);
324                 sendToServer(NULL);
325                 updateLiveTransactionsAndStatus();
326                 return true;
327         } catch (Exception *e) {
328                 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
329                 while (kit->hasNext()) {
330                         int64_t m = kit->next();
331                         updateFromLocal(m);
332                 }
333                 delete kit;
334         }
335
336         return false;
337 }
338
339 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
340         while (true) {
341                 if (!arbitratorTable->contains(keyName)) {
342                         // There is already an arbitrator
343                         return false;
344                 }
345                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
346
347                 if (sendToServer(newKey)) {
348                         // If successfully inserted
349                         return true;
350                 }
351         }
352 }
353
354 void Table::startTransaction() {
355         // Create a new transaction, invalidates any old pending transactions.
356         pendingTransactionBuilder = new PendingTransaction(localMachineId);
357 }
358
359 void Table::addKV(IoTString *key, IoTString *value) {
360
361         // Make sure it is a valid key
362         if (!arbitratorTable->contains(key)) {
363                 throw new Error("Key not Found.");
364         }
365
366         // Make sure new key value pair matches the current arbitrator
367         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
368                 // TODO: Maybe not throw en error
369                 throw new Error("Not all Key Values Match Arbitrator.");
370         }
371
372         // Add the key value to this transaction
373         KeyValue *kv = new KeyValue(key, value);
374         pendingTransactionBuilder->addKV(kv);
375 }
376
377 TransactionStatus *Table::commitTransaction() {
378         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
379                 // transaction with no updates will have no effect on the system
380                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
381         }
382
383         // Set the local transaction sequence number and increment
384         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
385         localTransactionSequenceNumber++;
386
387         // Create the transaction status
388         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
389
390         // Create the new transaction
391         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
392         newTransaction->setTransactionStatus(transactionStatus);
393
394         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
395                 // Add it to the queue and invalidate the builder for safety
396                 pendingTransactionQueue->add(newTransaction);
397         } else {
398                 arbitrateOnLocalTransaction(newTransaction);
399                 updateLiveStateFromLocal();
400         }
401
402         pendingTransactionBuilder = new PendingTransaction(localMachineId);
403
404         try {
405                 sendToServer(NULL);
406         } catch (ServerException *e) {
407
408                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
409                 uint size = pendingTransactionQueue->size();
410                 uint oldindex = 0;
411                 for (int iter = 0; iter < size; iter++) {
412                         Transaction *transaction = pendingTransactionQueue->get(iter);
413                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
414
415                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
416                                 // Already contacted this client so ignore all attempts to contact this client
417                                 // to preserve ordering for arbitrator
418                                 continue;
419                         }
420
421                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
422
423                         if (sendReturn.getFirst()) {
424                                 // Failed to contact over local
425                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
426                         } else {
427                                 // Successful contact or should not contact
428
429                                 if (sendReturn.getSecond()) {
430                                         // did arbitrate
431                                         oldindex--;
432                                 }
433                         }
434                 }
435                 pendingTransactionQueue->setSize(oldindex);
436         }
437
438         updateLiveStateFromLocal();
439
440         return transactionStatus;
441 }
442
443 /**
444  * Recalculate the new resize threshold
445  */
446 void Table::setResizeThreshold() {
447         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
448         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
449 }
450
451 int64_t Table::getLocalSequenceNumber() {
452         return localSequenceNumber;
453 }
454
455 bool Table::sendToServer(NewKey *newKey) {
456         bool fromRetry = false;
457         try {
458                 if (hadPartialSendToServer) {
459                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
460                         if (newSlots->length() == 0) {
461                                 fromRetry = true;
462                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
463
464                                 if (sendSlotsReturn.getFirst()) {
465                                         if (newKey != NULL) {
466                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
467                                                         newKey = NULL;
468                                                 }
469                                         }
470
471                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
472                                         while (trit->hasNext()) {
473                                                 Transaction *transaction = trit->next();
474                                                 transaction->resetServerFailure();
475                                                 // Update which transactions parts still need to be sent
476                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
477                                                 // Add the transaction status to the outstanding list
478                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
479
480                                                 // Update the transaction status
481                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
482
483                                                 // Check if all the transaction parts were successfully
484                                                 // sent and if so then remove it from pending
485                                                 if (transaction->didSendAllParts()) {
486                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
487                                                         pendingTransactionQueue->remove(transaction);
488                                                 }
489                                         }
490                                         delete trit;
491                                 } else {
492                                         newSlots = sendSlotsReturn.getThird();
493                                         bool isInserted = false;
494                                         for (uint si = 0; si < newSlots->length(); si++) {
495                                                 Slot *s = newSlots->get(si);
496                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
497                                                         isInserted = true;
498                                                         break;
499                                                 }
500                                         }
501
502                                         for (uint si = 0; si < newSlots->length(); si++) {
503                                                 Slot *s = newSlots->get(si);
504                                                 if (isInserted) {
505                                                         break;
506                                                 }
507
508                                                 // Process each entry in the slot
509                                                 Vector<Entry *> *ventries = s->getEntries();
510                                                 uint vesize = ventries->size();
511                                                 for (uint vei = 0; vei < vesize; vei++) {
512                                                         Entry *entry = ventries->get(vei);
513                                                         if (entry->getType() == TypeLastMessage) {
514                                                                 LastMessage *lastMessage = (LastMessage *)entry;
515                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
516                                                                         isInserted = true;
517                                                                         break;
518                                                                 }
519                                                         }
520                                                 }
521                                         }
522
523                                         if (isInserted) {
524                                                 if (newKey != NULL) {
525                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
526                                                                 newKey = NULL;
527                                                         }
528                                                 }
529
530                                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
531                                                 while (trit->hasNext()) {
532                                                         Transaction *transaction = trit->next();
533                                                         transaction->resetServerFailure();
534
535                                                         // Update which transactions parts still need to be sent
536                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
537
538                                                         // Add the transaction status to the outstanding list
539                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
540
541                                                         // Update the transaction status
542                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
543
544                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
545                                                         if (transaction->didSendAllParts()) {
546                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
547                                                                 pendingTransactionQueue->remove(transaction);
548                                                         } else {
549                                                                 transaction->resetServerFailure();
550                                                                 // Set the transaction sequence number back to nothing
551                                                                 if (!transaction->didSendAPartToServer()) {
552                                                                         transaction->setSequenceNumber(-1);
553                                                                 }
554                                                         }
555                                                 }
556                                                 delete trit;
557                                         }
558                                 }
559
560                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
561                                 while (trit->hasNext()) {
562                                         Transaction *transaction = trit->next();
563                                         transaction->resetServerFailure();
564                                         // Set the transaction sequence number back to nothing
565                                         if (!transaction->didSendAPartToServer()) {
566                                                 transaction->setSequenceNumber(-1);
567                                         }
568                                 }
569                                 delete trit;
570
571                                 if (sendSlotsReturn.getThird()->length() != 0) {
572                                         // insert into the local block chain
573                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
574                                 }
575                                 // continue;
576                         } else {
577                                 bool isInserted = false;
578                                 for (uint si = 0; si < newSlots->length(); si++) {
579                                         Slot *s = newSlots->get(si);
580                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
581                                                 isInserted = true;
582                                                 break;
583                                         }
584                                 }
585
586                                 for (uint si = 0; si < newSlots->length(); si++) {
587                                         Slot *s = newSlots->get(si);
588                                         if (isInserted) {
589                                                 break;
590                                         }
591
592                                         // Process each entry in the slot
593                                         Vector<Entry *> *entries = s->getEntries();
594                                         uint eSize = entries->size();
595                                         for(uint ei=0; ei < eSize; ei++) {
596                                                 Entry * entry = entries->get(ei);
597                                                 
598                                                 if (entry->getType() == TypeLastMessage) {
599                                                         LastMessage *lastMessage = (LastMessage *)entry;
600                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
601                                                                 isInserted = true;
602                                                                 break;
603                                                         }
604                                                 }
605                                         }
606                                 }
607
608                                 if (isInserted) {
609                                         if (newKey != NULL) {
610                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
611                                                         newKey = NULL;
612                                                 }
613                                         }
614
615                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
616                                         while (trit->hasNext()) {
617                                                 Transaction *transaction = trit->next();
618                                                 transaction->resetServerFailure();
619
620                                                 // Update which transactions parts still need to be sent
621                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
622
623                                                 // Add the transaction status to the outstanding list
624                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
625
626                                                 // Update the transaction status
627                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
628
629                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
630                                                 if (transaction->didSendAllParts()) {
631                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
632                                                         pendingTransactionQueue->remove(transaction);
633                                                 } else {
634                                                         transaction->resetServerFailure();
635                                                         // Set the transaction sequence number back to nothing
636                                                         if (!transaction->didSendAPartToServer()) {
637                                                                 transaction->setSequenceNumber(-1);
638                                                         }
639                                                 }
640                                         }
641                                         delete trit;
642                                 } else {
643                                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
644                                         while (trit->hasNext()) {
645                                                 Transaction *transaction = trit->next();
646                                                 transaction->resetServerFailure();
647                                                 // Set the transaction sequence number back to nothing
648                                                 if (!transaction->didSendAPartToServer()) {
649                                                         transaction->setSequenceNumber(-1);
650                                                 }
651                                         }
652                                         delete trit;
653                                 }
654
655                                 // insert into the local block chain
656                                 validateAndUpdate(newSlots, true);
657                         }
658                 }
659         } catch (ServerException *e) {
660                 throw e;
661         }
662
663
664
665         try {
666                 // While we have stuff that needs inserting into the block chain
667                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
668
669                         fromRetry = false;
670
671                         if (hadPartialSendToServer) {
672                                 throw new Error("Should Be error free");
673                         }
674
675
676
677                         // If there is a new key with same name then end
678                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
679                                 return false;
680                         }
681
682                         // Create the slot
683                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
684                         localSequenceNumber++;
685
686                         // Try to fill the slot with data
687                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
688                         bool needsResize = fillSlotsReturn.getFirst();
689                         int newSize = fillSlotsReturn.getSecond();
690                         bool insertedNewKey = fillSlotsReturn.getThird();
691
692                         if (needsResize) {
693                                 // Reset which transaction to send
694                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
695                                 while (trit->hasNext()) {
696                                         Transaction *transaction = trit->next();
697                                         transaction->resetNextPartToSend();
698
699                                         // Set the transaction sequence number back to nothing
700                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
701                                                 transaction->setSequenceNumber(-1);
702                                         }
703                                 }
704                                 delete trit;
705
706                                 // Clear the sent data since we are trying again
707                                 pendingSendArbitrationEntriesToDelete->clear();
708                                 transactionPartsSent->clear();
709
710                                 // We needed a resize so try again
711                                 fillSlot(slot, true, newKey);
712                         }
713
714                         lastSlotAttemptedToSend = slot;
715                         lastIsNewKey = (newKey != NULL);
716                         lastInsertedNewKey = insertedNewKey;
717                         lastNewSize = newSize;
718                         lastNewKey = newKey;
719                         lastTransactionPartsSent = transactionPartsSent->clone();
720                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
721
722                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
723
724                         if (sendSlotsReturn.getFirst()) {
725
726                                 // Did insert into the block chain
727
728                                 if (insertedNewKey) {
729                                         // This slot was what was inserted not a previous slot
730
731                                         // New Key was successfully inserted into the block chain so dont want to insert it again
732                                         newKey = NULL;
733                                 }
734
735                                 // Remove the aborts and commit parts that were sent from the pending to send queue
736                                 uint size = pendingSendArbitrationRounds->size();
737                                 uint oldcount = 0;
738                                 for (uint i = 0; i < size; i++) {
739                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
740                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
741
742                                         if (!round->isDoneSending()) {
743                                                 // Sent all the parts
744                                                 pendingSendArbitrationRounds->set(oldcount++,
745                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
746                                         }
747                                 }
748                                 pendingSendArbitrationRounds->setSize(oldcount);
749
750                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
751                                 while (trit->hasNext()) {
752                                         Transaction *transaction = trit->next();
753                                         transaction->resetServerFailure();
754
755                                         // Update which transactions parts still need to be sent
756                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
757
758                                         // Add the transaction status to the outstanding list
759                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
760
761                                         // Update the transaction status
762                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
763
764                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
765                                         if (transaction->didSendAllParts()) {
766                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
767                                                 pendingTransactionQueue->remove(transaction);
768                                         }
769                                 }
770                                 delete trit;
771                         } else {
772                                 // Reset which transaction to send
773                                 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
774                                 while (trit->hasNext()) {
775                                         Transaction *transaction = trit->next();
776                                         transaction->resetNextPartToSend();
777
778                                         // Set the transaction sequence number back to nothing
779                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
780                                                 transaction->setSequenceNumber(-1);
781                                         }
782                                 }
783                                 delete trit;
784                         }
785
786                         // Clear the sent data in preparation for next send
787                         pendingSendArbitrationEntriesToDelete->clear();
788                         transactionPartsSent->clear();
789
790                         if (sendSlotsReturn.getThird()->length() != 0) {
791                                 // insert into the local block chain
792                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
793                         }
794                 }
795
796         } catch (ServerException *e) {
797                 if (e->getType() != ServerException_TypeInputTimeout) {
798                         // Nothing was able to be sent to the server so just clear these data structures
799                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
800                         while (trit->hasNext()) {
801                                 Transaction *transaction = trit->next();
802                                 transaction->resetNextPartToSend();
803
804                                 // Set the transaction sequence number back to nothing
805                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
806                                         transaction->setSequenceNumber(-1);
807                                 }
808                         }
809                         delete trit;
810                 } else {
811                         // There was a partial send to the server
812                         hadPartialSendToServer = true;
813
814                         // Nothing was able to be sent to the server so just clear these data structures
815                         SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
816                         while (trit->hasNext()) {
817                                 Transaction *transaction = trit->next();
818                                 transaction->resetNextPartToSend();
819                                 transaction->setServerFailure();
820                         }
821                         delete trit;
822                 }
823
824                 pendingSendArbitrationEntriesToDelete->clear();
825                 transactionPartsSent->clear();
826
827                 throw e;
828         }
829
830         return newKey == NULL;
831 }
832
833 bool Table::updateFromLocal(int64_t machineId) {
834         if (!localCommunicationTable->contains(machineId))
835                 return false;
836
837         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
838
839         // Get the size of the send data
840         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
841
842         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
843         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
844                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
845         }
846
847         Array<char> *sendData = new Array<char>(sendDataSize);
848         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
849
850         // Encode the data
851         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
852         bbEncode->putInt(0);
853
854         // Send by local
855         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
856         localSequenceNumber++;
857
858         if (returnData == NULL) {
859                 // Could not contact server
860                 return false;
861         }
862
863         // Decode the data
864         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
865         int numberOfEntries = bbDecode->getInt();
866
867         for (int i = 0; i < numberOfEntries; i++) {
868                 char type = bbDecode->get();
869                 if (type == TypeAbort) {
870                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
871                         processEntry(abort);
872                 } else if (type == TypeCommitPart) {
873                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
874                         processEntry(commitPart);
875                 }
876         }
877
878         updateLiveStateFromLocal();
879
880         return true;
881 }
882
883 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
884
885         // Get the devices local communications
886         if (!localCommunicationTable->contains(transaction->getArbitrator()))
887                 return Pair<bool, bool>(true, false);
888         
889         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
890
891         // Get the size of the send data
892         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
893         {
894                 Vector<TransactionPart *> * tParts = transaction->getParts();
895                 uint tPartsSize = tParts->size();
896                 for (uint i = 0; i < tPartsSize; i++) {
897                         TransactionPart * part = tParts->get(i);
898                         sendDataSize += part->getSize();
899                 }
900         }
901
902         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
903         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
904                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
905         }
906
907         // Make the send data size
908         Array<char> *sendData = new Array<char>(sendDataSize);
909         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
910
911         // Encode the data
912         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
913         bbEncode->putInt(transaction->getParts()->size());
914         {
915                 Vector<TransactionPart *> * tParts = transaction->getParts();
916                 uint tPartsSize = tParts->size();
917                 for (uint i = 0; i < tPartsSize; i++) {
918                         TransactionPart * part = tParts->get(i);
919                         part->encode(bbEncode);
920                 }
921         }
922
923         // Send by local
924         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
925         localSequenceNumber++;
926
927         if (returnData == NULL) {
928                 // Could not contact server
929                 return Pair<bool, bool>(true, false);
930         }
931
932         // Decode the data
933         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
934         bool didCommit = bbDecode->get() == 1;
935         bool couldArbitrate = bbDecode->get() == 1;
936         int numberOfEntries = bbDecode->getInt();
937         bool foundAbort = false;
938
939         for (int i = 0; i < numberOfEntries; i++) {
940                 char type = bbDecode->get();
941                 if (type == TypeAbort) {
942                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
943
944                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
945                                 foundAbort = true;
946                         }
947
948                         processEntry(abort);
949                 } else if (type == TypeCommitPart) {
950                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
951                         processEntry(commitPart);
952                 }
953         }
954
955         updateLiveStateFromLocal();
956
957         if (couldArbitrate) {
958                 TransactionStatus * status =  transaction->getTransactionStatus();
959                 if (didCommit) {
960                         status->setStatus(TransactionStatus_StatusCommitted);
961                 } else {
962                         status->setStatus(TransactionStatus_StatusAborted);
963                 }
964         } else {
965                 TransactionStatus * status =  transaction->getTransactionStatus();
966                 if (foundAbort) {
967                         status->setStatus(TransactionStatus_StatusAborted);
968                 } else {
969                         status->setStatus(TransactionStatus_StatusCommitted);
970                 }
971         }
972
973         return Pair<bool, bool>(false, true);
974 }
975
976 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
977
978         // Decode the data
979         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
980         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
981         int numberOfParts = bbDecode->getInt();
982
983         // If we did commit a transaction or not
984         bool didCommit = false;
985         bool couldArbitrate = false;
986
987         if (numberOfParts != 0) {
988
989                 // decode the transaction
990                 Transaction *transaction = new Transaction();
991                 for (int i = 0; i < numberOfParts; i++) {
992                         bbDecode->get();
993                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
994                         transaction->addPartDecode(newPart);
995                 }
996
997                 // Arbitrate on transaction and pull relevant return data
998                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
999                 couldArbitrate = localArbitrateReturn.getFirst();
1000                 didCommit = localArbitrateReturn.getSecond();
1001
1002                 updateLiveStateFromLocal();
1003
1004                 // Transaction was sent to the server so keep track of it to prevent double commit
1005                 if (transaction->getSequenceNumber() != -1) {
1006                         offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1007                 }
1008         }
1009
1010         // The data to send back
1011         int returnDataSize = 0;
1012         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1013
1014         // Get the aborts to send back
1015         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
1016         Collections->sort(abortLocalSequenceNumbers);
1017         uint asize = abortLocalSequenceNumbers->size();
1018         for(uint i=0; i<asize; i++) {
1019                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1020                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1021                         continue;
1022                 }
1023                 
1024                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1025                 unseenArbitrations->add(abort);
1026                 returnDataSize += abort->getSize();
1027         }
1028
1029         // Get the commits to send back
1030         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1031         if (commitForClientTable != NULL) {
1032                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1033                 Collections->sort(commitLocalSequenceNumbers);
1034
1035                 uint clsSize = commitLocalSequenceNumbers->size();
1036                 for(uint clsi = 0; clsi < clsSize; clsi++) {
1037                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1038                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1039
1040                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1041                                 continue;
1042                         }
1043
1044                         unseenArbitrations->addAll(commit->getParts()->values());
1045
1046                         for (CommitPart *commitPart : commit->getParts()->values()) {
1047                                 returnDataSize += commitPart->getSize();
1048                         }
1049                 }
1050         }
1051
1052         // Number of arbitration entries to decode
1053         returnDataSize += 2 * sizeof(int32_t);
1054
1055         // bool of did commit or not
1056         if (numberOfParts != 0) {
1057                 returnDataSize += sizeof(char);
1058         }
1059
1060         // Data to send Back
1061         Array<char> *returnData = new Array<char>(returnDataSize);
1062         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1063
1064         if (numberOfParts != 0) {
1065                 if (didCommit) {
1066                         bbEncode->put((char)1);
1067                 } else {
1068                         bbEncode->put((char)0);
1069                 }
1070                 if (couldArbitrate) {
1071                         bbEncode->put((char)1);
1072                 } else {
1073                         bbEncode->put((char)0);
1074                 }
1075         }
1076
1077         bbEncode->putInt(unseenArbitrations->size());
1078         uint size = unseenArbitrations->size();
1079         for (uint i = 0; i < size; i++) {
1080                 Entry *entry = unseenArbitrations->get(i);
1081                 entry->encode(bbEncode);
1082         }
1083
1084         localSequenceNumber++;
1085         return returnData;
1086 }
1087
1088 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1089         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1090         attemptedToSendToServer = true;
1091
1092         bool inserted = false;
1093         bool lastTryInserted = false;
1094
1095         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1096         if (array == NULL) {
1097                 array = new Array<Slot *>();
1098                 array->set(0, slot);
1099                 rejectedSlotVector->clear();
1100                 inserted = true;
1101         } else {
1102                 if (array->length() == 0) {
1103                         throw new Error("Server Error: Did not send any slots");
1104                 }
1105
1106                 // if (attemptedToSendToServerTmp) {
1107                 if (hadPartialSendToServer) {
1108
1109                         bool isInserted = false;
1110                         uint size = array->length();
1111                         for (uint i = 0; i < size; i++) {
1112                                 Slot *s = array->get(i);
1113                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1114                                         isInserted = true;
1115                                         break;
1116                                 }
1117                         }
1118
1119                         for (uint i = 0; i < size; i++) {
1120                                 Slot *s = array->get(i);
1121                                 if (isInserted) {
1122                                         break;
1123                                 }
1124
1125                                 // Process each entry in the slot
1126                                 Vector<Entry *> *entries = s->getEntries();
1127                                 uint eSize = entries->size();
1128                                 for(uint ei=0; ei < eSize; ei++) {
1129                                         Entry * entry = entries->get(ei);
1130
1131                                         if (entry->getType() == TypeLastMessage) {
1132                                                 LastMessage *lastMessage = (LastMessage *)entry;
1133
1134                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1135                                                         isInserted = true;
1136                                                         break;
1137                                                 }
1138                                         }
1139                                 }
1140                         }
1141
1142                         if (!isInserted) {
1143                                 rejectedSlotVector->add(slot->getSequenceNumber());
1144                                 lastTryInserted = false;
1145                         } else {
1146                                 lastTryInserted = true;
1147                         }
1148                 } else {
1149                         rejectedSlotVector->add(slot->getSequenceNumber());
1150                         lastTryInserted = false;
1151                 }
1152         }
1153
1154         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1155 }
1156
1157 /**
1158  * Returns false if a resize was needed
1159  */
1160 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1161         int newSize = 0;
1162         if (liveSlotCount > bufferResizeThreshold) {
1163                 resize = true;//Resize is forced
1164         }
1165
1166         if (resize) {
1167                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1168                 TableStatus *status = new TableStatus(slot, newSize);
1169                 slot->addEntry(status);
1170         }
1171
1172         // Fill with rejected slots first before doing anything else
1173         doRejectedMessages(slot);
1174
1175         // Do mandatory rescue of entries
1176         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1177
1178         // Extract working variables
1179         bool needsResize = mandatoryRescueReturn.getFirst();
1180         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1181         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1182
1183         if (needsResize && !resize) {
1184                 // We need to resize but we are not resizing so return false
1185                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1186         }
1187
1188         bool inserted = false;
1189         if (newKeyEntry != NULL) {
1190                 newKeyEntry->setSlot(slot);
1191                 if (slot->hasSpace(newKeyEntry)) {
1192                         slot->addEntry(newKeyEntry);
1193                         inserted = true;
1194                 }
1195         }
1196
1197         // Clear the transactions, aborts and commits that were sent previously
1198         transactionPartsSent->clear();
1199         pendingSendArbitrationEntriesToDelete->clear();
1200         uint size = pendingSendArbitrationRounds->size();
1201         for (uint i = 0; i < size; i++) {
1202                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1203                 bool isFull = false;
1204                 round->generateParts();
1205                 Vector<Entry *> *parts = round->getParts();
1206
1207                 // Insert pending arbitration data
1208                 uint vsize = parts->size();
1209                 for (uint vi = 0; vi < vsize; vi++) {
1210                         Entry *arbitrationData = parts->get(vi);
1211
1212                         // If it is an abort then we need to set some information
1213                         if (arbitrationData->getType() == TypeAbort) {
1214                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1215                         }
1216
1217                         if (!slot->hasSpace(arbitrationData)) {
1218                                 // No space so cant do anything else with these data entries
1219                                 isFull = true;
1220                                 break;
1221                         }
1222
1223                         // Add to this current slot and add it to entries to delete
1224                         slot->addEntry(arbitrationData);
1225                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1226                 }
1227
1228                 if (isFull) {
1229                         break;
1230                 }
1231         }
1232
1233         if (pendingTransactionQueue->size() > 0) {
1234                 Transaction *transaction = pendingTransactionQueue->get(0);
1235                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1236                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1237                         transaction->setSequenceNumber(slot->getSequenceNumber());
1238                 }
1239
1240                 while (true) {
1241                         TransactionPart *part = transaction->getNextPartToSend();
1242                         if (part == NULL) {
1243                                 // Ran out of parts to send for this transaction so move on
1244                                 break;
1245                         }
1246
1247                         if (slot->hasSpace(part)) {
1248                                 slot->addEntry(part);
1249                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1250                                 if (partsSent == NULL) {
1251                                         partsSent = new Vector<int32_t>();
1252                                         transactionPartsSent->put(transaction, partsSent);
1253                                 }
1254                                 partsSent->add(part->getPartNumber());
1255                                 transactionPartsSent->put(transaction, partsSent);
1256                         } else {
1257                                 break;
1258                         }
1259                 }
1260         }
1261
1262         // Fill the remainder of the slot with rescue data
1263         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1264
1265         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1266 }
1267
1268 void Table::doRejectedMessages(Slot *s) {
1269         if (!rejectedSlotVector->isEmpty()) {
1270                 /* TODO: We should avoid generating a rejected message entry if
1271                  * there is already a sufficient entry in the queue (e->g->,
1272                  * equalsto value of true and same sequence number)->  */
1273
1274                 int64_t old_seqn = rejectedSlotVector->get(0);
1275                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1276                         int64_t new_seqn = rejectedSlotVector->lastElement();
1277                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1278                         s->addEntry(rm);
1279                 } else {
1280                         int64_t prev_seqn = -1;
1281                         int i = 0;
1282                         /* Go through list of missing messages */
1283                         for (; i < rejectedSlotVector->size(); i++) {
1284                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1285                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1286                                 if (s_msg != NULL)
1287                                         break;
1288                                 prev_seqn = curr_seqn;
1289                         }
1290                         /* Generate rejected message entry for missing messages */
1291                         if (prev_seqn != -1) {
1292                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1293                                 s->addEntry(rm);
1294                         }
1295                         /* Generate rejected message entries for present messages */
1296                         for (; i < rejectedSlotVector->size(); i++) {
1297                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1298                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1299                                 int64_t machineid = s_msg->getMachineID();
1300                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1301                                 s->addEntry(rm);
1302                         }
1303                 }
1304         }
1305 }
1306
1307 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1308         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1309         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1310         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1311                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1312         }
1313
1314         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1315         bool seenLiveSlot = false;
1316         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1317         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1318
1319
1320         // Mandatory Rescue
1321         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1322                 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1323                 // Push slot number forward
1324                 if (!seenLiveSlot) {
1325                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1326                 }
1327
1328                 if (!previousSlot->isLive()) {
1329                         continue;
1330                 }
1331
1332                 // We have seen a live slot
1333                 seenLiveSlot = true;
1334
1335                 // Get all the live entries for a slot
1336                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1337
1338                 // Iterate over all the live entries and try to rescue them
1339                 for (Entry *liveEntry : liveEntries) {
1340                         if (slot->hasSpace(liveEntry)) {
1341                                 // Enough space to rescue the entry
1342                                 slot->addEntry(liveEntry);
1343                         } else if (currentSequenceNumber == firstIfFull) {
1344                                 //if there's no space but the entry is about to fall off the queue
1345                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1346                         }
1347                 }
1348         }
1349
1350         // Did not resize
1351         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1352 }
1353
1354 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1355         /* now go through live entries from least to greatest sequence number until
1356          * either all live slots added, or the slot doesn't have enough room
1357          * for SKIP_THRESHOLD consecutive entries*/
1358         int skipcount = 0;
1359         int64_t newestseqnum = buffer->getNewestSeqNum();
1360 search:
1361         for (; seqn <= newestseqnum; seqn++) {
1362                 Slot *prevslot = buffer->getSlot(seqn);
1363                 //Push slot number forward
1364                 if (!seenliveslot)
1365                         oldestLiveSlotSequenceNumver = seqn;
1366
1367                 if (!prevslot->isLive())
1368                         continue;
1369                 seenliveslot = true;
1370                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1371                 for (Entry *liveentry : liveentries) {
1372                         if (s->hasSpace(liveentry))
1373                                 s->addEntry(liveentry);
1374                         else {
1375                                 skipcount++;
1376                                 if (skipcount > Table_SKIP_THRESHOLD)
1377                                         goto donesearch;
1378                         }
1379                 }
1380         }
1381  donesearch:
1382         ;
1383 }
1384
1385 /**
1386  * Checks for malicious activity and updates the local copy of the block chain->
1387  */
1388 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1389         // The cloud communication layer has checked slot HMACs already
1390         // before decoding
1391         if (newSlots->length() == 0) {
1392                 return;
1393         }
1394
1395         // Make sure all slots are newer than the last largest slot this
1396         // client has seen
1397         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1398         if (firstSeqNum <= sequenceNumber) {
1399                 throw new Error("Server Error: Sent older slots!");
1400         }
1401
1402         // Create an object that can access both new slots and slots in our
1403         // local chain without committing slots to our local chain
1404         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1405
1406         // Check that the HMAC chain is not broken
1407         checkHMACChain(indexer, newSlots);
1408
1409         // Set to keep track of messages from clients
1410         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1411
1412         // Process each slots data
1413         for (Slot *slot : newSlots) {
1414                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1415
1416                 updateExpectedSize();
1417         }
1418
1419         // If there is a gap, check to see if the server sent us
1420         // everything->
1421         if (firstSeqNum != (sequenceNumber + 1)) {
1422
1423                 // Check the size of the slots that were sent down by the server->
1424                 // Can only check the size if there was a gap
1425                 checkNumSlots(newSlots->length);
1426
1427                 // Since there was a gap every machine must have pushed a slot or
1428                 // must have a last message message-> If not then the server is
1429                 // hiding slots
1430                 if (!machineSet->isEmpty()) {
1431                         throw new Error("Missing record for machines: ");
1432                 }
1433         }
1434
1435         // Update the size of our local block chain->
1436         commitNewMaxSize();
1437
1438         // Commit new to slots to the local block chain->
1439         for (Slot *slot : newSlots) {
1440
1441                 // Insert this slot into our local block chain copy->
1442                 buffer->putSlot(slot);
1443
1444                 // Keep track of how many slots are currently live (have live data
1445                 // in them)->
1446                 liveSlotCount++;
1447         }
1448
1449         // Get the sequence number of the latest slot in the system
1450         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1451         updateLiveStateFromServer();
1452
1453         // No Need to remember after we pulled from the server
1454         offlineTransactionsCommittedAndAtServer->clear();
1455
1456         // This is invalidated now
1457         hadPartialSendToServer = false;
1458 }
1459
1460 void Table::updateLiveStateFromServer() {
1461         // Process the new transaction parts
1462         processNewTransactionParts();
1463
1464         // Do arbitration on new transactions that were received
1465         arbitrateFromServer();
1466
1467         // Update all the committed keys
1468         bool didCommitOrSpeculate = updateCommittedTable();
1469
1470         // Delete the transactions that are now dead
1471         updateLiveTransactionsAndStatus();
1472
1473         // Do speculations
1474         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1475         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1476 }
1477
1478 void Table::updateLiveStateFromLocal() {
1479         // Update all the committed keys
1480         bool didCommitOrSpeculate = updateCommittedTable();
1481
1482         // Delete the transactions that are now dead
1483         updateLiveTransactionsAndStatus();
1484
1485         // Do speculations
1486         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1487         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1488 }
1489
1490 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1491         int64_t prevslots = firstSequenceNumber;
1492
1493         if (didFindTableStatus) {
1494         } else {
1495                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1496         }
1497
1498         didFindTableStatus = true;
1499         currMaxSize = numberOfSlots;
1500 }
1501
1502 void Table::updateExpectedSize() {
1503         expectedsize++;
1504
1505         if (expectedsize > currMaxSize) {
1506                 expectedsize = currMaxSize;
1507         }
1508 }
1509
1510
1511 /**
1512  * Check the size of the block chain to make sure there are enough
1513  * slots sent back by the server-> This is only called when we have a
1514  * gap between the slots that we have locally and the slots sent by
1515  * the server therefore in the slots sent by the server there will be
1516  * at least 1 Table status message
1517  */
1518 void Table::checkNumSlots(int numberOfSlots) {
1519         if (numberOfSlots != expectedsize) {
1520                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1521         }
1522 }
1523
1524 void Table::updateCurrMaxSize(int newmaxsize) {
1525         currMaxSize = newmaxsize;
1526 }
1527
1528
1529 /**
1530  * Update the size of of the local buffer if it is needed->
1531  */
1532 void Table::commitNewMaxSize() {
1533         didFindTableStatus = false;
1534
1535         // Resize the local slot buffer
1536         if (numberOfSlots != currMaxSize) {
1537                 buffer->resize((int32_t)currMaxSize);
1538         }
1539
1540         // Change the number of local slots to the new size
1541         numberOfSlots = (int32_t)currMaxSize;
1542
1543         // Recalculate the resize threshold since the size of the local
1544         // buffer has changed
1545         setResizeThreshold();
1546 }
1547
1548 /**
1549  * Process the new transaction parts from this latest round of slots
1550  * received from the server
1551  */
1552 void Table::processNewTransactionParts() {
1553
1554         if (newTransactionParts->size() == 0) {
1555                 // Nothing new to process
1556                 return;
1557         }
1558
1559         // Iterate through all the machine Ids that we received new parts
1560         // for
1561         for (int64_t machineId : newTransactionParts->keySet()) {
1562                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1563
1564                 // Iterate through all the parts for that machine Id
1565                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1566                         TransactionPart *part = parts->get(partId);
1567
1568                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1569                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1570                                 // Set dead the transaction part
1571                                 part->setDead();
1572                                 continue;
1573                         }
1574
1575                         // Get the transaction object for that sequence number
1576                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1577
1578                         if (transaction == NULL) {
1579                                 // This is a new transaction that we dont have so make a new one
1580                                 transaction = new Transaction();
1581
1582                                 // Insert this new transaction into the live tables
1583                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1584                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1585                         }
1586
1587                         // Add that part to the transaction
1588                         transaction->addPartDecode(part);
1589                 }
1590         }
1591
1592         // Clear all the new transaction parts in preparation for the next
1593         // time the server sends slots
1594         newTransactionParts->clear();
1595 }
1596
1597 void Table::arbitrateFromServer() {
1598
1599         if (liveTransactionBySequenceNumberTable->size() == 0) {
1600                 // Nothing to arbitrate on so move on
1601                 return;
1602         }
1603
1604         // Get the transaction sequence numbers and sort from oldest to newest
1605         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1606         Collections->sort(transactionSequenceNumbers);
1607
1608         // Collection of key value pairs that are
1609         Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1610
1611         // The last transaction arbitrated on
1612         int64_t lastTransactionCommitted = -1;
1613         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1614
1615         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1616                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1617
1618
1619
1620                 // Check if this machine arbitrates for this transaction if not
1621                 // then we cant arbitrate this transaction
1622                 if (transaction->getArbitrator() != localMachineId) {
1623                         continue;
1624                 }
1625
1626                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1627                         continue;
1628                 }
1629
1630                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1631                         // We have seen this already locally so dont commit again
1632                         continue;
1633                 }
1634
1635
1636                 if (!transaction->isComplete()) {
1637                         // Will arbitrate in incorrect order if we continue so just break
1638                         // Most likely this
1639                         break;
1640                 }
1641
1642
1643                 // update the largest transaction seen by arbitrator from server
1644                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1645                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1646                 } else {
1647                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1648                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1649                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1650                         }
1651                 }
1652
1653                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1654                         // Guard evaluated as true
1655
1656                         // Update the local changes so we can make the commit
1657                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1658                         while (kvit->hasNext()) {
1659                                 KeyValue *kv = kvit->next();
1660                                 speculativeTableTmp->put(kv->getKey(), kv);
1661                         }
1662                         delete kvit;
1663                         
1664                         // Update what the last transaction committed was for use in batch commit
1665                         lastTransactionCommitted = transactionSequenceNumber;
1666                 } else {
1667                         // Guard evaluated was false so create abort
1668                         // Create the abort
1669                         Abort *newAbort = new Abort(NULL,
1670                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1671                                                                                                                                         transaction->getSequenceNumber(),
1672                                                                                                                                         transaction->getMachineId(),
1673                                                                                                                                         transaction->getArbitrator(),
1674                                                                                                                                         localArbitrationSequenceNumber);
1675                         localArbitrationSequenceNumber++;
1676                         generatedAborts->add(newAbort);
1677
1678                         // Insert the abort so we can process
1679                         processEntry(newAbort);
1680                 }
1681
1682                 lastSeqNumArbOn = transactionSequenceNumber;
1683         }
1684
1685         Commit *newCommit = NULL;
1686
1687         // If there is something to commit
1688         if (speculativeTableTmp->size() != 0) {
1689                 // Create the commit and increment the commit sequence number
1690                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1691                 localArbitrationSequenceNumber++;
1692
1693                 // Add all the new keys to the commit
1694                 for (KeyValue *kv : speculativeTableTmp->values()) {
1695                         newCommit->addKV(kv);
1696                 }
1697
1698                 // create the commit parts
1699                 newCommit->createCommitParts();
1700
1701                 // Append all the commit parts to the end of the pending queue
1702                 // waiting for sending to the server
1703                 // Insert the commit so we can process it
1704                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1705                         processEntry(commitPart);
1706                 }
1707         }
1708
1709         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1710                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1711                 pendingSendArbitrationRounds->add(arbitrationRound);
1712
1713                 if (compactArbitrationData()) {
1714                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1715                         if (newArbitrationRound->getCommit() != NULL) {
1716                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1717                                         processEntry(commitPart);
1718                                 }
1719                         }
1720                 }
1721         }
1722 }
1723
1724 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1725
1726         // Check if this machine arbitrates for this transaction if not then
1727         // we cant arbitrate this transaction
1728         if (transaction->getArbitrator() != localMachineId) {
1729                 return Pair<bool, bool>(false, false);
1730         }
1731
1732         if (!transaction->isComplete()) {
1733                 // Will arbitrate in incorrect order if we continue so just break
1734                 // Most likely this
1735                 return Pair<bool, bool>(false, false);
1736         }
1737
1738         if (transaction->getMachineId() != localMachineId) {
1739                 // dont do this check for local transactions
1740                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1741                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1742                                 // We've have already seen this from the server
1743                                 return Pair<bool, bool>(false, false);
1744                         }
1745                 }
1746         }
1747
1748         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1749                 // Guard evaluated as true Create the commit and increment the
1750                 // commit sequence number
1751                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1752                 localArbitrationSequenceNumber++;
1753
1754                 // Update the local changes so we can make the commit
1755                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1756                 while (kvit->hasNext()) {
1757                         KeyValue *kv = kvit->next();
1758                         newCommit->addKV(kv);
1759                 }
1760                 delete kvit;
1761                 
1762                 // create the commit parts
1763                 newCommit->createCommitParts();
1764
1765                 // Append all the commit parts to the end of the pending queue
1766                 // waiting for sending to the server
1767                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1768                 pendingSendArbitrationRounds->add(arbitrationRound);
1769
1770                 if (compactArbitrationData()) {
1771                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1772                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1773                                 processEntry(commitPart);
1774                         }
1775                 } else {
1776                         // Insert the commit so we can process it
1777                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1778                                 processEntry(commitPart);
1779                         }
1780                 }
1781
1782                 if (transaction->getMachineId() == localMachineId) {
1783                         TransactionStatus *status = transaction->getTransactionStatus();
1784                         if (status != NULL) {
1785                                 status->setStatus(TransactionStatus_StatusCommitted);
1786                         }
1787                 }
1788
1789                 updateLiveStateFromLocal();
1790                 return Pair<bool, bool>(true, true);
1791         } else {
1792                 if (transaction->getMachineId() == localMachineId) {
1793                         // For locally created messages update the status
1794                         // Guard evaluated was false so create abort
1795                         TransactionStatus * status = transaction->getTransactionStatus();
1796                         if (status != NULL) {
1797                                 status->setStatus(TransactionStatus_StatusAborted);
1798                         }
1799                 } else {
1800                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1801
1802                         // Create the abort
1803                         Abort *newAbort = new Abort(NULL,
1804                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1805                                                                                                                                         -1,
1806                                                                                                                                         transaction->getMachineId(),
1807                                                                                                                                         transaction->getArbitrator(),
1808                                                                                                                                         localArbitrationSequenceNumber);
1809                         localArbitrationSequenceNumber++;
1810                         addAbortSet->add(newAbort);
1811
1812                         // Append all the commit parts to the end of the pending queue
1813                         // waiting for sending to the server
1814                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1815                         pendingSendArbitrationRounds->add(arbitrationRound);
1816
1817                         if (compactArbitrationData()) {
1818                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1819                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1820                                         processEntry(commitPart);
1821                                 }
1822                         }
1823                 }
1824
1825                 updateLiveStateFromLocal();
1826                 return Pair<bool, bool>(true, false);
1827         }
1828 }
1829
1830 /**
1831  * Compacts the arbitration data my merging commits and aggregating
1832  * aborts so that a single large push of commits can be done instead
1833  * of many small updates
1834  */
1835 bool Table::compactArbitrationData() {
1836         if (pendingSendArbitrationRounds->size() < 2) {
1837                 // Nothing to compact so do nothing
1838                 return false;
1839         }
1840
1841         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1842         if (lastRound->getDidSendPart()) {
1843                 return false;
1844         }
1845
1846         bool hadCommit = (lastRound->getCommit() == NULL);
1847         bool gotNewCommit = false;
1848
1849         int numberToDelete = 1;
1850         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1851                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1852
1853                 if (round->isFull() || round->getDidSendPart()) {
1854                         // Stop since there is a part that cannot be compacted and we
1855                         // need to compact in order
1856                         break;
1857                 }
1858
1859                 if (round->getCommit() == NULL) {
1860                         // Try compacting aborts only
1861                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1862                         if (newSize > ArbitrationRound->MAX_PARTS) {
1863                                 // Cant compact since it would be too large
1864                                 break;
1865                         }
1866                         lastRound->addAborts(round->getAborts());
1867                 } else {
1868                         // Create a new larger commit
1869                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1870                         localArbitrationSequenceNumber++;
1871
1872                         // Create the commit parts so that we can count them
1873                         newCommit->createCommitParts();
1874
1875                         // Calculate the new size of the parts
1876                         int newSize = newCommit->getNumberOfParts();
1877                         newSize += lastRound->getAbortsCount();
1878                         newSize += round->getAbortsCount();
1879
1880                         if (newSize > ArbitrationRound->MAX_PARTS) {
1881                                 // Cant compact since it would be too large
1882                                 break;
1883                         }
1884
1885                         // Set the new compacted part
1886                         lastRound->setCommit(newCommit);
1887                         lastRound->addAborts(round->getAborts());
1888                         gotNewCommit = true;
1889                 }
1890
1891                 numberToDelete++;
1892         }
1893
1894         if (numberToDelete != 1) {
1895                 // If there is a compaction
1896                 // Delete the previous pieces that are now in the new compacted piece
1897                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1898                         pendingSendArbitrationRounds->clear();
1899                 } else {
1900                         for (int i = 0; i < numberToDelete; i++) {
1901                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1902                         }
1903                 }
1904
1905                 // Add the new compacted into the pending to send list
1906                 pendingSendArbitrationRounds->add(lastRound);
1907
1908                 // Should reinsert into the commit processor
1909                 if (hadCommit && gotNewCommit) {
1910                         return true;
1911                 }
1912         }
1913
1914         return false;
1915 }
1916
1917 /**
1918  * Update all the commits and the committed tables, sets dead the dead
1919  * transactions
1920  */
1921 bool Table::updateCommittedTable() {
1922
1923         if (newCommitParts->size() == 0) {
1924                 // Nothing new to process
1925                 return false;
1926         }
1927
1928         // Iterate through all the machine Ids that we received new parts for
1929         for (int64_t machineId : newCommitParts->keySet()) {
1930                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1931
1932                 // Iterate through all the parts for that machine Id
1933                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1934                         CommitPart *part = parts->get(partId);
1935
1936                         // Get the transaction object for that sequence number
1937                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1938
1939                         if (commitForClientTable == NULL) {
1940                                 // This is the first commit from this device
1941                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1942                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1943                         }
1944
1945                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1946
1947                         if (commit == NULL) {
1948                                 // This is a new commit that we dont have so make a new one
1949                                 commit = new Commit();
1950
1951                                 // Insert this new commit into the live tables
1952                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1953                         }
1954
1955                         // Add that part to the commit
1956                         commit->addPartDecode(part);
1957                 }
1958         }
1959
1960         // Clear all the new commits parts in preparation for the next time
1961         // the server sends slots
1962         newCommitParts->clear();
1963
1964         // If we process a new commit keep track of it for future use
1965         bool didProcessANewCommit = false;
1966
1967         // Process the commits one by one
1968         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1969
1970                 // Get all the commits for a specific arbitrator
1971                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1972
1973                 // Sort the commits in order
1974                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1975                 Collections->sort(commitSequenceNumbers);
1976
1977                 // Get the last commit seen from this arbitrator
1978                 int64_t lastCommitSeenSequenceNumber = -1;
1979                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1980                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1981                 }
1982
1983                 // Go through each new commit one by one
1984                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1985                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1986                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1987
1988                         // Special processing if a commit is not complete
1989                         if (!commit->isComplete()) {
1990                                 if (i == (commitSequenceNumbers->size() - 1)) {
1991                                         // If there is an incomplete commit and this commit is the
1992                                         // latest one seen then this commit cannot be processed and
1993                                         // there are no other commits
1994                                         break;
1995                                 } else {
1996                                         // This is a commit that was already dead but parts of it
1997                                         // are still in the block chain (not flushed out yet)->
1998                                         // Delete it and move on
1999                                         commit->setDead();
2000                                         commitForClientTable->remove(commit->getSequenceNumber());
2001                                         continue;
2002                                 }
2003                         }
2004
2005                         // Update the last transaction that was updated if we can
2006                         if (commit->getTransactionSequenceNumber() != -1) {
2007                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2008
2009                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2010                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2011                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2012                                 }
2013                         }
2014
2015                         // Update the last arbitration data that we have seen so far
2016                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2017                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2018                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2019                                         // Is larger
2020                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2021                                 }
2022                         } else {
2023                                 // Never seen any data from this arbitrator so record the first one
2024                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2025                         }
2026
2027                         // We have already seen this commit before so need to do the
2028                         // full processing on this commit
2029                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2030
2031                                 // Update the last transaction that was updated if we can
2032                                 if (commit->getTransactionSequenceNumber() != -1) {
2033                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2034
2035                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2036                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2037                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2038                                         }
2039                                 }
2040
2041                                 continue;
2042                         }
2043
2044                         // If we got here then this is a brand new commit and needs full
2045                         // processing
2046                         // Get what commits should be edited, these are the commits that
2047                         // have live values for their keys
2048                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2049                         {
2050                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2051                                 while (kvit->hasNext()) {
2052                                         KeyValue *kv = kvit->next();
2053                                         commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2054                                 }
2055                                 delete kvit;
2056                         }
2057                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2058
2059                         // Update each previous commit that needs to be updated
2060                         for (Commit *previousCommit : commitsToEdit) {
2061
2062                                 // Only bother with live commits (TODO: Maybe remove this check)
2063                                 if (previousCommit->isLive()) {
2064
2065                                         // Update which keys in the old commits are still live
2066                                         {
2067                                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2068                                                 while (kvit->hasNext()) {
2069                                                         KeyValue *kv = kvit->next();
2070                                                         previousCommit->invalidateKey(kv->getKey());
2071                                                 }
2072                                                 delete kvit;
2073                                         }
2074                                         
2075                                         // if the commit is now dead then remove it
2076                                         if (!previousCommit->isLive()) {
2077                                                 commitForClientTable->remove(previousCommit);
2078                                         }
2079                                 }
2080                         }
2081
2082                         // Update the last seen sequence number from this arbitrator
2083                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2084                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2085                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2086                                 }
2087                         } else {
2088                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2089                         }
2090
2091                         // We processed a new commit that we havent seen before
2092                         didProcessANewCommit = true;
2093
2094                         // Update the committed table of keys and which commit is using which key
2095                         {
2096                                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2097                                 while (kvit->hasNext()) {
2098                                         KeyValue *kv = kvit->next();
2099                                         committedKeyValueTable->put(kv->getKey(), kv);
2100                                         liveCommitsByKeyTable->put(kv->getKey(), commit);
2101                                 }
2102                                 delete kvit;
2103                         }
2104                 }
2105         }
2106
2107         return didProcessANewCommit;
2108 }
2109
2110 /**
2111  * Create the speculative table from transactions that are still live
2112  * and have come from the cloud
2113  */
2114 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2115         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2116                 // There is nothing to speculate on
2117                 return false;
2118         }
2119
2120         // Create a list of the transaction sequence numbers and sort them
2121         // from oldest to newest
2122         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2123         Collections->sort(transactionSequenceNumbersSorted);
2124
2125         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2126
2127
2128         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2129                 // If there is a gap in the transaction sequence numbers then
2130                 // there was a commit or an abort of a transaction OR there was a
2131                 // new commit (Could be from offline commit) so a redo the
2132                 // speculation from scratch
2133
2134                 // Start from scratch
2135                 speculatedKeyValueTable->clear();
2136                 lastTransactionSequenceNumberSpeculatedOn = -1;
2137                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2138
2139         }
2140
2141         // Remember the front of the transaction list
2142         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2143
2144         // Find where to start arbitration from
2145         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2146
2147         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2148                 // Make sure we are not out of bounds
2149                 return false;           // did not speculate
2150         }
2151
2152         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2153         bool didSkip = true;
2154
2155         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2156                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2157                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2158
2159                 if (!transaction->isComplete()) {
2160                         // If there is an incomplete transaction then there is nothing
2161                         // we can do add this transactions arbitrator to the list of
2162                         // arbitrators we should ignore
2163                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2164                         didSkip = true;
2165                         continue;
2166                 }
2167
2168                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2169                         continue;
2170                 }
2171
2172                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2173
2174                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2175                         // Guard evaluated to true so update the speculative table
2176                         {
2177                                 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2178                                 while (kvit->hasNext()) {
2179                                         KeyValue *kv = kvit->next();
2180                                         speculatedKeyValueTable->put(kv->getKey(), kv);
2181                                 }
2182                                 delete kvit;
2183                         }
2184                 }
2185         }
2186
2187         if (didSkip) {
2188                 // Since there was a skip we need to redo the speculation next time around
2189                 lastTransactionSequenceNumberSpeculatedOn = -1;
2190                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2191         }
2192
2193         // We did some speculation
2194         return true;
2195 }
2196
2197 /**
2198  * Create the pending transaction speculative table from transactions
2199  * that are still in the pending transaction buffer
2200  */
2201 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2202         if (pendingTransactionQueue->size() == 0) {
2203                 // There is nothing to speculate on
2204                 return;
2205         }
2206
2207         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2208                 // need to reset on the pending speculation
2209                 lastPendingTransactionSpeculatedOn = NULL;
2210                 firstPendingTransaction = pendingTransactionQueue->get(0);
2211                 pendingTransactionSpeculatedKeyValueTable->clear();
2212         }
2213
2214         // Find where to start arbitration from
2215         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2216
2217         if (startIndex >= pendingTransactionQueue->size()) {
2218                 // Make sure we are not out of bounds
2219                 return;
2220         }
2221
2222         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2223                 Transaction *transaction = pendingTransactionQueue->get(i);
2224
2225                 lastPendingTransactionSpeculatedOn = transaction;
2226
2227                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2228                         // Guard evaluated to true so update the speculative table
2229                         SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2230                         while (kvit->hasNext()) {
2231                                 KeyValue *kv = kvit->next();
2232                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2233                         }
2234                         delete kvit;
2235                 }
2236         }
2237 }
2238
2239 /**
2240  * Set dead and remove from the live transaction tables the
2241  * transactions that are dead
2242  */
2243 void Table::updateLiveTransactionsAndStatus() {
2244
2245         // Go through each of the transactions
2246         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2247                 Transaction *transaction = iter->next()->getValue();
2248
2249                 // Check if the transaction is dead
2250                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2251                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2252
2253                         // Set dead the transaction
2254                         transaction->setDead();
2255
2256                         // Remove the transaction from the live table
2257                         iter->remove();
2258                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2259                 }
2260         }
2261
2262         // Go through each of the transactions
2263         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2264                 TransactionStatus *status = iter->next()->getValue();
2265
2266                 // Check if the transaction is dead
2267                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2268                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2269
2270                         // Set committed
2271                         status->setStatus(TransactionStatus_StatusCommitted);
2272
2273                         // Remove
2274                         iter->remove();
2275                 }
2276         }
2277 }
2278
2279 /**
2280  * Process this slot, entry by entry->  Also update the latest message sent by slot
2281  */
2282 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2283
2284         // Update the last message seen
2285         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2286
2287         // Process each entry in the slot
2288         Vector<Entry *> *entries = slot->getEntries();
2289         uint eSize = entries->size();
2290         for(uint ei=0; ei < eSize; ei++) {
2291                 Entry * entry = entries->get(ei);
2292                 switch (entry->getType()) {
2293                 case TypeCommitPart:
2294                         processEntry((CommitPart *)entry);
2295                         break;
2296                 case TypeAbort:
2297                         processEntry((Abort *)entry);
2298                         break;
2299                 case TypeTransactionPart:
2300                         processEntry((TransactionPart *)entry);
2301                         break;
2302                 case TypeNewKey:
2303                         processEntry((NewKey *)entry);
2304                         break;
2305                 case TypeLastMessage:
2306                         processEntry((LastMessage *)entry, machineSet);
2307                         break;
2308                 case TypeRejectedMessage:
2309                         processEntry((RejectedMessage *)entry, indexer);
2310                         break;
2311                 case TypeTableStatus:
2312                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2313                         break;
2314                 default:
2315                         throw new Error("Unrecognized type: ");
2316                 }
2317         }
2318 }
2319
2320 /**
2321  * Update the last message that was sent for a machine Id
2322  */
2323 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2324         // Update what the last message received by a machine was
2325         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2326 }
2327
2328 /**
2329  * Add the new key to the arbitrators table and update the set of live
2330  * new keys (in case of a rescued new key message)
2331  */
2332 void Table::processEntry(NewKey *entry) {
2333         // Update the arbitrator table with the new key information
2334         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2335
2336         // Update what the latest live new key is
2337         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2338         if (oldNewKey != NULL) {
2339                 // Delete the old new key messages
2340                 oldNewKey->setDead();
2341         }
2342 }
2343
2344 /**
2345  * Process new table status entries and set dead the old ones as new
2346  * ones come in-> keeps track of the largest and smallest table status
2347  * seen in this current round of updating the local copy of the block
2348  * chain
2349  */
2350 void Table::processEntry(TableStatus entry, int64_t seq) {
2351         int newNumSlots = entry->getMaxSlots();
2352         updateCurrMaxSize(newNumSlots);
2353         initExpectedSize(seq, newNumSlots);
2354
2355         if (liveTableStatus != NULL) {
2356                 // We have a larger table status so the old table status is no
2357                 // int64_ter alive
2358                 liveTableStatus->setDead();
2359         }
2360
2361         // Make this new table status the latest alive table status
2362         liveTableStatus = entry;
2363 }
2364
2365 /**
2366  * Check old messages to see if there is a block chain violation->
2367  * Also
2368  */
2369 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2370         int64_t oldSeqNum = entry->getOldSeqNum();
2371         int64_t newSeqNum = entry->getNewSeqNum();
2372         bool isequal = entry->getEqual();
2373         int64_t machineId = entry->getMachineID();
2374         int64_t seq = entry->getSequenceNumber();
2375
2376         // Check if we have messages that were supposed to be rejected in
2377         // our local block chain
2378         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2379                 // Get the slot
2380                 Slot *slot = indexer->getSlot(seqNum);
2381
2382                 if (slot != NULL) {
2383                         // If we have this slot make sure that it was not supposed to be
2384                         // a rejected slot
2385                         int64_t slotMachineId = slot->getMachineID();
2386                         if (isequal != (slotMachineId == machineId)) {
2387                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2388                         }
2389                 }
2390         }
2391
2392         // Create a list of clients to watch until they see this rejected
2393         // message entry->
2394         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2395         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2396                 // Machine ID for the last message entry
2397                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2398
2399                 // We've seen it, don't need to continue to watch->  Our next
2400                 // message will implicitly acknowledge it->
2401                 if (lastMessageEntryMachineId == localMachineId) {
2402                         continue;
2403                 }
2404
2405                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2406                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2407
2408                 if (entrySequenceNumber < seq) {
2409                         // Add this rejected message to the set of messages that this
2410                         // machine ID did not see yet
2411                         addWatchVector(lastMessageEntryMachineId, entry);
2412                         // This client did not see this rejected message yet so add it
2413                         // to the watch set to monitor
2414                         deviceWatchSet->add(lastMessageEntryMachineId);
2415                 }
2416         }
2417         if (deviceWatchSet->isEmpty()) {
2418                 // This rejected message has been seen by all the clients so
2419                 entry->setDead();
2420         } else {
2421                 // We need to watch this rejected message
2422                 entry->setWatchSet(deviceWatchSet);
2423         }
2424 }
2425
2426 /**
2427  * Check if this abort is live, if not then save it so we can kill it
2428  * later-> update the last transaction number that was arbitrated on->
2429  */
2430 void Table::processEntry(Abort *entry) {
2431         if (entry->getTransactionSequenceNumber() != -1) {
2432                 // update the transaction status if it was sent to the server
2433                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2434                 if (status != NULL) {
2435                         status->setStatus(TransactionStatus_StatusAborted);
2436                 }
2437         }
2438
2439         // Abort has not been seen by the client it is for yet so we need to
2440         // keep track of it
2441         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2442         if (previouslySeenAbort != NULL) {
2443                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2444         }
2445
2446         if (entry->getTransactionArbitrator() == localMachineId) {
2447                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2448         }
2449
2450         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2451                 // The machine already saw this so it is dead
2452                 entry->setDead();
2453                 liveAbortTable->remove(&entry->getAbortId());
2454
2455                 if (entry->getTransactionArbitrator() == localMachineId) {
2456                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2457                 }
2458                 return;
2459         }
2460
2461         // Update the last arbitration data that we have seen so far
2462         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2463                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2464                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2465                         // Is larger
2466                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2467                 }
2468         } else {
2469                 // Never seen any data from this arbitrator so record the first one
2470                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2471         }
2472
2473         // Set dead a transaction if we can
2474         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2475         if (transactionToSetDead != NULL) {
2476                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2477         }
2478
2479         // Update the last transaction sequence number that the arbitrator
2480         // arbitrated on
2481         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2482         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2483                 // Is a valid one
2484                 if (entry->getTransactionSequenceNumber() != -1) {
2485                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2486                 }
2487         }
2488 }
2489
2490 /**
2491  * Set dead the transaction part if that transaction is dead and keep
2492  * track of all new parts
2493  */
2494 void Table::processEntry(TransactionPart *entry) {
2495         // Check if we have already seen this transaction and set it dead OR
2496         // if it is not alive
2497         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2498         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2499                 // This transaction is dead, it was already committed or aborted
2500                 entry->setDead();
2501                 return;
2502         }
2503
2504         // This part is still alive
2505         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2506
2507         if (transactionPart == NULL) {
2508                 // Dont have a table for this machine Id yet so make one
2509                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2510                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2511         }
2512
2513         // Update the part and set dead ones we have already seen (got a
2514         // rescued version)
2515         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2516         if (previouslySeenPart != NULL) {
2517                 previouslySeenPart->setDead();
2518         }
2519 }
2520
2521 /**
2522  * Process new commit entries and save them for future use->  Delete duplicates
2523  */
2524 void Table::processEntry(CommitPart *entry) {
2525         // Update the last transaction that was updated if we can
2526         if (entry->getTransactionSequenceNumber() != -1) {
2527                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2528                 // Update the last transaction sequence number that the arbitrator
2529                 // arbitrated on
2530                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2531                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2532                 }
2533         }
2534
2535         Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2536         if (commitPart == NULL) {
2537                 // Don't have a table for this machine Id yet so make one
2538                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2539                 newCommitParts->put(entry->getMachineId(), commitPart);
2540         }
2541         // Update the part and set dead ones we have already seen (got a
2542         // rescued version)
2543         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2544         if (previouslySeenPart != NULL) {
2545                 previouslySeenPart->setDead();
2546         }
2547 }
2548
2549 /**
2550  * Update the last message seen table-> Update and set dead the
2551  * appropriate RejectedMessages as clients see them-> Updates the live
2552  * aborts, removes those that are dead and sets them dead-> Check that
2553  * the last message seen is correct and that there is no mismatch of
2554  * our own last message or that other clients have not had a rollback
2555  * on the last message->
2556  */
2557 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2558         // We have seen this machine ID
2559         machineSet->remove(machineId);
2560
2561         // Get the set of rejected messages that this machine Id is has not seen yet
2562         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2563         // If there is a rejected message that this machine Id has not seen yet
2564         if (watchset != NULL) {
2565                 // Go through each rejected message that this machine Id has not
2566                 // seen yet
2567
2568                 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2569                 while(rmit->hasNext()) {
2570                         RejectedMessage *rm = rmit->next();
2571                         // If this machine Id has seen this rejected message->->->
2572                         if (rm->getSequenceNumber() <= seqNum) {
2573                                 // Remove it from our watchlist
2574                                 rmit->remove();
2575                                 // Decrement machines that need to see this notification
2576                                 rm->removeWatcher(machineId);
2577                         }
2578                 }
2579                 delete rmit;
2580         }
2581
2582         // Set dead the abort
2583         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2584                 Abort *abort = i->next()->getValue();
2585                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2586                         abort->setDead();
2587                         i->remove();
2588                         if (abort->getTransactionArbitrator() == localMachineId) {
2589                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2590                         }
2591                 }
2592         }
2593         if (machineId == localMachineId) {
2594                 // Our own messages are immediately dead->
2595                 char livenessType = liveness->getType();
2596                 if (livenessType==TypeLastMessage) {
2597                         ((LastMessage *)liveness)->setDead();
2598                 } else if (livenessType == TypeSlot) {
2599                         ((Slot *)liveness)->setDead();
2600                 } else {
2601                         throw new Error("Unrecognized type");
2602                 }
2603         }
2604         // Get the old last message for this device
2605         Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2606         if (lastMessageEntry == NULL) {
2607                 // If no last message then there is nothing else to process
2608                 return;
2609         }
2610
2611         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2612         Liveness *lastEntry = lastMessageEntry->getSecond();
2613         delete lastMessageEntry;
2614         
2615         // If it is not our machine Id since we already set ours to dead
2616         if (machineId != localMachineId) {
2617                 char lastEntryType = lastEntry->getType();
2618                 
2619                 if (lastEntryType == TypeLastMessage) {
2620                         ((LastMessage *)lastEntry)->setDead();
2621                 } else if (lastEntryType == TypeSlot) {
2622                         ((Slot *)lastEntry)->setDead();
2623                 } else {
2624                         throw new Error("Unrecognized type");
2625                 }
2626         }
2627         // Make sure the server is not playing any games
2628         if (machineId == localMachineId) {
2629                 if (hadPartialSendToServer) {
2630                         // We were not making any updates and we had a machine mismatch
2631                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2632                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2633                         }
2634                 } else {
2635                         // We were not making any updates and we had a machine mismatch
2636                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2637                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2638                         }
2639                 }
2640         } else {
2641                 if (lastMessageSeqNum > seqNum) {
2642                         throw new Error("Server Error: Rollback on remote machine sequence number");
2643                 }
2644         }
2645 }
2646
2647 /**
2648  * Add a rejected message entry to the watch set to keep track of
2649  * which clients have seen that rejected message entry and which have
2650  * not.
2651  */
2652 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2653         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2654         if (entries == NULL) {
2655                 // There is no set for this machine ID yet so create one
2656                 entries = new Hashset<RejectedMessage *>();
2657                 rejectedMessageWatchVectorTable->put(machineId, entries);
2658         }
2659         entries->add(entry);
2660 }
2661
2662 /**
2663  * Check if the HMAC chain is not violated
2664  */
2665 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2666         for (int i = 0; i < newSlots->length(); i++) {
2667                 Slot *currSlot = newSlots->get(i);
2668                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2669                 if (prevSlot != NULL &&
2670                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2671                         throw new Error("Server Error: Invalid HMAC Chain");
2672         }
2673 }