ee3b8bb0438b1f63f5343b4236ce8b2efda9ef1e
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
19 #include "Commit.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
22
23 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
24         buffer(NULL),
25         cloud(new CloudComm(this, baseurl, password, listeningPort)),
26         random(NULL),
27         liveTableStatus(NULL),
28         pendingTransactionBuilder(NULL),
29         lastPendingTransactionSpeculatedOn(NULL),
30         firstPendingTransaction(NULL),
31         numberOfSlots(0),
32         bufferResizeThreshold(0),
33         liveSlotCount(0),
34         oldestLiveSlotSequenceNumver(1),
35         localMachineId(_localMachineId),
36         sequenceNumber(0),
37         localTransactionSequenceNumber(0),
38         lastTransactionSequenceNumberSpeculatedOn(0),
39         oldestTransactionSequenceNumberSpeculatedOn(0),
40         localArbitrationSequenceNumber(0),
41         hadPartialSendToServer(false),
42         attemptedToSendToServer(false),
43         expectedsize(0),
44         didFindTableStatus(false),
45         currMaxSize(0),
46         lastSlotAttemptedToSend(NULL),
47         lastIsNewKey(false),
48         lastNewSize(0),
49         lastTransactionPartsSent(NULL),
50         lastPendingSendArbitrationEntriesToDelete(NULL),
51         lastNewKey(NULL),
52         committedKeyValueTable(NULL),
53         speculatedKeyValueTable(NULL),
54         pendingTransactionSpeculatedKeyValueTable(NULL),
55         liveNewKeyTable(NULL),
56         lastMessageTable(NULL),
57         rejectedMessageWatchVectorTable(NULL),
58         arbitratorTable(NULL),
59         liveAbortTable(NULL),
60         newTransactionParts(NULL),
61         newCommitParts(NULL),
62         lastArbitratedTransactionNumberByArbitratorTable(NULL),
63         liveTransactionBySequenceNumberTable(NULL),
64         liveTransactionByTransactionIdTable(NULL),
65         liveCommitsTable(NULL),
66         liveCommitsByKeyTable(NULL),
67         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
68         rejectedSlotVector(NULL),
69         pendingTransactionQueue(NULL),
70         pendingSendArbitrationRounds(NULL),
71         pendingSendArbitrationEntriesToDelete(NULL),
72         transactionPartsSent(NULL),
73         outstandingTransactionStatus(NULL),
74         liveAbortsGeneratedByLocal(NULL),
75         offlineTransactionsCommittedAndAtServer(NULL),
76         localCommunicationTable(NULL),
77         lastTransactionSeenFromMachineFromServer(NULL),
78         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
79         lastInsertedNewKey(false),
80         lastSeqNumArbOn(0)
81 {
82         init();
83 }
84
85 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
86         buffer(NULL),
87         cloud(_cloud),
88         random(NULL),
89         liveTableStatus(NULL),
90         pendingTransactionBuilder(NULL),
91         lastPendingTransactionSpeculatedOn(NULL),
92         firstPendingTransaction(NULL),
93         numberOfSlots(0),
94         bufferResizeThreshold(0),
95         liveSlotCount(0),
96         oldestLiveSlotSequenceNumver(1),
97         localMachineId(_localMachineId),
98         sequenceNumber(0),
99         localTransactionSequenceNumber(0),
100         lastTransactionSequenceNumberSpeculatedOn(0),
101         oldestTransactionSequenceNumberSpeculatedOn(0),
102         localArbitrationSequenceNumber(0),
103         hadPartialSendToServer(false),
104         attemptedToSendToServer(false),
105         expectedsize(0),
106         didFindTableStatus(false),
107         currMaxSize(0),
108         lastSlotAttemptedToSend(NULL),
109         lastIsNewKey(false),
110         lastNewSize(0),
111         lastTransactionPartsSent(NULL),
112         lastPendingSendArbitrationEntriesToDelete(NULL),
113         lastNewKey(NULL),
114         committedKeyValueTable(NULL),
115         speculatedKeyValueTable(NULL),
116         pendingTransactionSpeculatedKeyValueTable(NULL),
117         liveNewKeyTable(NULL),
118         lastMessageTable(NULL),
119         rejectedMessageWatchVectorTable(NULL),
120         arbitratorTable(NULL),
121         liveAbortTable(NULL),
122         newTransactionParts(NULL),
123         newCommitParts(NULL),
124         lastArbitratedTransactionNumberByArbitratorTable(NULL),
125         liveTransactionBySequenceNumberTable(NULL),
126         liveTransactionByTransactionIdTable(NULL),
127         liveCommitsTable(NULL),
128         liveCommitsByKeyTable(NULL),
129         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
130         rejectedSlotVector(NULL),
131         pendingTransactionQueue(NULL),
132         pendingSendArbitrationRounds(NULL),
133         pendingSendArbitrationEntriesToDelete(NULL),
134         transactionPartsSent(NULL),
135         outstandingTransactionStatus(NULL),
136         liveAbortsGeneratedByLocal(NULL),
137         offlineTransactionsCommittedAndAtServer(NULL),
138         localCommunicationTable(NULL),
139         lastTransactionSeenFromMachineFromServer(NULL),
140         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
141         lastInsertedNewKey(false),
142         lastSeqNumArbOn(0)
143 {
144         init();
145 }
146
147 /**
148  * Init all the stuff needed for for table usage
149  */
150 void Table::init() {
151         // Init helper objects
152         random = new Random();
153         buffer = new SlotBuffer();
154
155         // init data structs
156         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
157         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
158         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
159         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
160         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
161         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
162         arbitratorTable = new Hashtable<IoTString *, int64_t>();
163         liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
164         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
165         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
166         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
167         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
168         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
169         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
170         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
171         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
172         rejectedSlotVector = new Vector<int64_t>();
173         pendingTransactionQueue = new Vector<Transaction *>();
174         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
175         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
176         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
177         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
178         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
179         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
180         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
181         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
182         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
183
184         // Other init stuff
185         numberOfSlots = buffer->capacity();
186         setResizeThreshold();
187 }
188
189 /**
190  * Initialize the table by inserting a table status as the first entry
191  * into the table status also initialize the crypto stuff.
192  */
193 void Table::initTable() {
194         cloud->initSecurity();
195
196         // Create the first insertion into the block chain which is the table status
197         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
198         localSequenceNumber++;
199         TableStatus *status = new TableStatus(s, numberOfSlots);
200         s->addEntry(status);
201         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
202
203         if (array == NULL) {
204                 array = new Array<Slot *>(1);
205                 array->set(0, s);
206                 // update local block chain
207                 validateAndUpdate(array, true);
208         } else if (array->length() == 1) {
209                 // in case we did push the slot BUT we failed to init it
210                 validateAndUpdate(array, true);
211         } else {
212                 throw new Error("Error on initialization");
213         }
214 }
215
216 /**
217  * Rebuild the table from scratch by pulling the latest block chain
218  * from the server.
219  */
220 void Table::rebuild() {
221         // Just pull the latest slots from the server
222         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
223         validateAndUpdate(newslots, true);
224         sendToServer(NULL);
225         updateLiveTransactionsAndStatus();
226 }
227
228 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
229         localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
230 }
231
232 int64_t Table::getArbitrator(IoTString *key) {
233         return arbitratorTable->get(key);
234 }
235
236 void Table::close() {
237         cloud->close();
238 }
239
240 IoTString *Table::getCommitted(IoTString *key)  {
241         KeyValue *kv = committedKeyValueTable->get(key);
242
243         if (kv != NULL) {
244                 return kv->getValue();
245         } else {
246                 return NULL;
247         }
248 }
249
250 IoTString *Table::getSpeculative(IoTString *key) {
251         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
252
253         if (kv == NULL) {
254                 kv = speculatedKeyValueTable->get(key);
255         }
256
257         if (kv == NULL) {
258                 kv = committedKeyValueTable->get(key);
259         }
260
261         if (kv != NULL) {
262                 return kv->getValue();
263         } else {
264                 return NULL;
265         }
266 }
267
268 IoTString *Table::getCommittedAtomic(IoTString *key) {
269         KeyValue *kv = committedKeyValueTable->get(key);
270
271         if (!arbitratorTable->contains(key)) {
272                 throw new Error("Key not Found.");
273         }
274
275         // Make sure new key value pair matches the current arbitrator
276         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
277                 // TODO: Maybe not throw en error
278                 throw new Error("Not all Key Values Match Arbitrator.");
279         }
280
281         if (kv != NULL) {
282                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
283                 return kv->getValue();
284         } else {
285                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
286                 return NULL;
287         }
288 }
289
290 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
291         if (!arbitratorTable->contains(key)) {
292                 throw new Error("Key not Found.");
293         }
294
295         // Make sure new key value pair matches the current arbitrator
296         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
297                 // TODO: Maybe not throw en error
298                 throw new Error("Not all Key Values Match Arbitrator.");
299         }
300
301         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
302
303         if (kv == NULL) {
304                 kv = speculatedKeyValueTable->get(key);
305         }
306
307         if (kv == NULL) {
308                 kv = committedKeyValueTable->get(key);
309         }
310
311         if (kv != NULL) {
312                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
313                 return kv->getValue();
314         } else {
315                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
316                 return NULL;
317         }
318 }
319
320 bool Table::update()  {
321         try {
322                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
323                 validateAndUpdate(newSlots, false);
324                 sendToServer(NULL);
325                 updateLiveTransactionsAndStatus();
326                 return true;
327         } catch (Exception *e) {
328                 SetIterator<int64_t> *kit = getKeyIterator(localCommunicationTable);
329                 while (kit->hasNext()) {
330                         int64_t m = kit->next();
331                         updateFromLocal(m);
332                 }
333                 delete kit;
334         }
335
336         return false;
337 }
338
339 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
340         while (true) {
341                 if (!arbitratorTable->contains(keyName)) {
342                         // There is already an arbitrator
343                         return false;
344                 }
345                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
346
347                 if (sendToServer(newKey)) {
348                         // If successfully inserted
349                         return true;
350                 }
351         }
352 }
353
354 void Table::startTransaction() {
355         // Create a new transaction, invalidates any old pending transactions.
356         pendingTransactionBuilder = new PendingTransaction(localMachineId);
357 }
358
359 void Table::addKV(IoTString *key, IoTString *value) {
360
361         // Make sure it is a valid key
362         if (!arbitratorTable->contains(key)) {
363                 throw new Error("Key not Found.");
364         }
365
366         // Make sure new key value pair matches the current arbitrator
367         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
368                 // TODO: Maybe not throw en error
369                 throw new Error("Not all Key Values Match Arbitrator.");
370         }
371
372         // Add the key value to this transaction
373         KeyValue *kv = new KeyValue(key, value);
374         pendingTransactionBuilder->addKV(kv);
375 }
376
377 TransactionStatus *Table::commitTransaction() {
378         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
379                 // transaction with no updates will have no effect on the system
380                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
381         }
382
383         // Set the local transaction sequence number and increment
384         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
385         localTransactionSequenceNumber++;
386
387         // Create the transaction status
388         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
389
390         // Create the new transaction
391         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
392         newTransaction->setTransactionStatus(transactionStatus);
393
394         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
395                 // Add it to the queue and invalidate the builder for safety
396                 pendingTransactionQueue->add(newTransaction);
397         } else {
398                 arbitrateOnLocalTransaction(newTransaction);
399                 updateLiveStateFromLocal();
400         }
401
402         pendingTransactionBuilder = new PendingTransaction(localMachineId);
403
404         try {
405                 sendToServer(NULL);
406         } catch (ServerException *e) {
407
408                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
409                 uint size = pendingTransactionQueue->size();
410                 uint oldindex = 0;
411                 for (int iter = 0; iter < size; iter++) {
412                         Transaction *transaction = pendingTransactionQueue->get(iter);
413                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
414
415                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
416                                 // Already contacted this client so ignore all attempts to contact this client
417                                 // to preserve ordering for arbitrator
418                                 continue;
419                         }
420
421                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
422
423                         if (sendReturn.getFirst()) {
424                                 // Failed to contact over local
425                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
426                         } else {
427                                 // Successful contact or should not contact
428
429                                 if (sendReturn.getSecond()) {
430                                         // did arbitrate
431                                         oldindex--;
432                                 }
433                         }
434                 }
435                 pendingTransactionQueue->setSize(oldindex);
436         }
437
438         updateLiveStateFromLocal();
439
440         return transactionStatus;
441 }
442
443 /**
444  * Recalculate the new resize threshold
445  */
446 void Table::setResizeThreshold() {
447         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
448         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
449 }
450
451 int64_t Table::getLocalSequenceNumber() {
452         return localSequenceNumber;
453 }
454
455 bool Table::sendToServer(NewKey *newKey) {
456         bool fromRetry = false;
457         try {
458                 if (hadPartialSendToServer) {
459                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
460                         if (newSlots->length() == 0) {
461                                 fromRetry = true;
462                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
463
464                                 if (sendSlotsReturn.getFirst()) {
465                                         if (newKey != NULL) {
466                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
467                                                         newKey = NULL;
468                                                 }
469                                         }
470
471                                         SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
472                                         while (trit->hasNext()) {
473                                                 Transaction *transaction = trit->next();
474                                                 transaction->resetServerFailure();
475                                                 // Update which transactions parts still need to be sent
476                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
477                                                 // Add the transaction status to the outstanding list
478                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
479
480                                                 // Update the transaction status
481                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
482
483                                                 // Check if all the transaction parts were successfully
484                                                 // sent and if so then remove it from pending
485                                                 if (transaction->didSendAllParts()) {
486                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
487                                                         pendingTransactionQueue->remove(transaction);
488                                                 }
489                                         }
490                                         delete trit;
491                                 } else {
492                                         newSlots = sendSlotsReturn.getThird();
493                                         bool isInserted = false;
494                                         for (uint si = 0; si < newSlots->length(); si++) {
495                                                 Slot *s = newSlots->get(si);
496                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
497                                                         isInserted = true;
498                                                         break;
499                                                 }
500                                         }
501
502                                         for (uint si = 0; si < newSlots->length(); si++) {
503                                                 Slot *s = newSlots->get(si);
504                                                 if (isInserted) {
505                                                         break;
506                                                 }
507
508                                                 // Process each entry in the slot
509                                                 Vector<Entry *> *ventries = s->getEntries();
510                                                 uint vesize = ventries->size();
511                                                 for (uint vei = 0; vei < vesize; vei++) {
512                                                         Entry *entry = ventries->get(vei);
513                                                         if (entry->getType() == TypeLastMessage) {
514                                                                 LastMessage *lastMessage = (LastMessage *)entry;
515                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
516                                                                         isInserted = true;
517                                                                         break;
518                                                                 }
519                                                         }
520                                                 }
521                                         }
522
523                                         if (isInserted) {
524                                                 if (newKey != NULL) {
525                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
526                                                                 newKey = NULL;
527                                                         }
528                                                 }
529
530                                                 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
531                                                 while (trit->hasNext()) {
532                                                         Transaction *transaction = trit->next();
533                                                         transaction->resetServerFailure();
534
535                                                         // Update which transactions parts still need to be sent
536                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
537
538                                                         // Add the transaction status to the outstanding list
539                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
540
541                                                         // Update the transaction status
542                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
543
544                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
545                                                         if (transaction->didSendAllParts()) {
546                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
547                                                                 pendingTransactionQueue->remove(transaction);
548                                                         } else {
549                                                                 transaction->resetServerFailure();
550                                                                 // Set the transaction sequence number back to nothing
551                                                                 if (!transaction->didSendAPartToServer()) {
552                                                                         transaction->setSequenceNumber(-1);
553                                                                 }
554                                                         }
555                                                 }
556                                                 delete trit;
557                                         }
558                                 }
559
560                                 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
561                                 while (trit->hasNext()) {
562                                         Transaction *transaction = trit->next();
563                                         transaction->resetServerFailure();
564                                         // Set the transaction sequence number back to nothing
565                                         if (!transaction->didSendAPartToServer()) {
566                                                 transaction->setSequenceNumber(-1);
567                                         }
568                                 }
569                                 delete trit;
570
571                                 if (sendSlotsReturn.getThird()->length() != 0) {
572                                         // insert into the local block chain
573                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
574                                 }
575                                 // continue;
576                         } else {
577                                 bool isInserted = false;
578                                 for (uint si = 0; si < newSlots->length(); si++) {
579                                         Slot *s = newSlots->get(si);
580                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
581                                                 isInserted = true;
582                                                 break;
583                                         }
584                                 }
585
586                                 for (uint si = 0; si < newSlots->length(); si++) {
587                                         Slot *s = newSlots->get(si);
588                                         if (isInserted) {
589                                                 break;
590                                         }
591
592                                         // Process each entry in the slot
593                                         Vector<Entry *> *entries = s->getEntries();
594                                         uint eSize = entries->size();
595                                         for(uint ei=0; ei < eSize; ei++) {
596                                                 Entry * entry = entries->get(ei);
597                                                 
598                                                 if (entry->getType() == TypeLastMessage) {
599                                                         LastMessage *lastMessage = (LastMessage *)entry;
600                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
601                                                                 isInserted = true;
602                                                                 break;
603                                                         }
604                                                 }
605                                         }
606                                 }
607
608                                 if (isInserted) {
609                                         if (newKey != NULL) {
610                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
611                                                         newKey = NULL;
612                                                 }
613                                         }
614
615                                         SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
616                                         while (trit->hasNext()) {
617                                                 Transaction *transaction = trit->next();
618                                                 transaction->resetServerFailure();
619
620                                                 // Update which transactions parts still need to be sent
621                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
622
623                                                 // Add the transaction status to the outstanding list
624                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
625
626                                                 // Update the transaction status
627                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
628
629                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
630                                                 if (transaction->didSendAllParts()) {
631                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
632                                                         pendingTransactionQueue->remove(transaction);
633                                                 } else {
634                                                         transaction->resetServerFailure();
635                                                         // Set the transaction sequence number back to nothing
636                                                         if (!transaction->didSendAPartToServer()) {
637                                                                 transaction->setSequenceNumber(-1);
638                                                         }
639                                                 }
640                                         }
641                                         delete trit;
642                                 } else {
643                                         SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
644                                         while (trit->hasNext()) {
645                                                 Transaction *transaction = trit->next();
646                                                 transaction->resetServerFailure();
647                                                 // Set the transaction sequence number back to nothing
648                                                 if (!transaction->didSendAPartToServer()) {
649                                                         transaction->setSequenceNumber(-1);
650                                                 }
651                                         }
652                                         delete trit;
653                                 }
654
655                                 // insert into the local block chain
656                                 validateAndUpdate(newSlots, true);
657                         }
658                 }
659         } catch (ServerException *e) {
660                 throw e;
661         }
662
663
664
665         try {
666                 // While we have stuff that needs inserting into the block chain
667                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
668
669                         fromRetry = false;
670
671                         if (hadPartialSendToServer) {
672                                 throw new Error("Should Be error free");
673                         }
674
675
676
677                         // If there is a new key with same name then end
678                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
679                                 return false;
680                         }
681
682                         // Create the slot
683                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
684                         localSequenceNumber++;
685
686                         // Try to fill the slot with data
687                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
688                         bool needsResize = fillSlotsReturn.getFirst();
689                         int newSize = fillSlotsReturn.getSecond();
690                         bool insertedNewKey = fillSlotsReturn.getThird();
691
692                         if (needsResize) {
693                                 // Reset which transaction to send
694                                 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
695                                 while (trit->hasNext()) {
696                                         Transaction *transaction = trit->next();
697                                         transaction->resetNextPartToSend();
698
699                                         // Set the transaction sequence number back to nothing
700                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
701                                                 transaction->setSequenceNumber(-1);
702                                         }
703                                 }
704                                 delete trit;
705
706                                 // Clear the sent data since we are trying again
707                                 pendingSendArbitrationEntriesToDelete->clear();
708                                 transactionPartsSent->clear();
709
710                                 // We needed a resize so try again
711                                 fillSlot(slot, true, newKey);
712                         }
713
714                         lastSlotAttemptedToSend = slot;
715                         lastIsNewKey = (newKey != NULL);
716                         lastInsertedNewKey = insertedNewKey;
717                         lastNewSize = newSize;
718                         lastNewKey = newKey;
719                         lastTransactionPartsSent = transactionPartsSent->clone();
720                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
721
722                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
723
724                         if (sendSlotsReturn.getFirst()) {
725
726                                 // Did insert into the block chain
727
728                                 if (insertedNewKey) {
729                                         // This slot was what was inserted not a previous slot
730
731                                         // New Key was successfully inserted into the block chain so dont want to insert it again
732                                         newKey = NULL;
733                                 }
734
735                                 // Remove the aborts and commit parts that were sent from the pending to send queue
736                                 uint size = pendingSendArbitrationRounds->size();
737                                 uint oldcount = 0;
738                                 for (uint i = 0; i < size; i++) {
739                                         ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
740                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
741
742                                         if (!round->isDoneSending()) {
743                                                 // Sent all the parts
744                                                 pendingSendArbitrationRounds->set(oldcount++,
745                                                                                                                                                                                         pendingSendArbitrationRounds->get(i));
746                                         }
747                                 }
748                                 pendingSendArbitrationRounds->setSize(oldcount);
749
750                                 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
751                                 while (trit->hasNext()) {
752                                         Transaction *transaction = trit->next();
753                                         transaction->resetServerFailure();
754
755                                         // Update which transactions parts still need to be sent
756                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
757
758                                         // Add the transaction status to the outstanding list
759                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
760
761                                         // Update the transaction status
762                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
763
764                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
765                                         if (transaction->didSendAllParts()) {
766                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
767                                                 pendingTransactionQueue->remove(transaction);
768                                         }
769                                 }
770                                 delete trit;
771                         } else {
772                                 // Reset which transaction to send
773                                 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
774                                 while (trit->hasNext()) {
775                                         Transaction *transaction = trit->next();
776                                         transaction->resetNextPartToSend();
777
778                                         // Set the transaction sequence number back to nothing
779                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
780                                                 transaction->setSequenceNumber(-1);
781                                         }
782                                 }
783                                 delete trit;
784                         }
785
786                         // Clear the sent data in preparation for next send
787                         pendingSendArbitrationEntriesToDelete->clear();
788                         transactionPartsSent->clear();
789
790                         if (sendSlotsReturn.getThird()->length() != 0) {
791                                 // insert into the local block chain
792                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
793                         }
794                 }
795
796         } catch (ServerException *e) {
797                 if (e->getType() != ServerException->TypeInputTimeout) {
798                         // Nothing was able to be sent to the server so just clear these data structures
799                         SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
800                         while (trit->hasNext()) {
801                                 Transaction *transaction = trit->next();
802                                 transaction->resetNextPartToSend();
803
804                                 // Set the transaction sequence number back to nothing
805                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
806                                         transaction->setSequenceNumber(-1);
807                                 }
808                         }
809                         delete trit;
810                 } else {
811                         // There was a partial send to the server
812                         hadPartialSendToServer = true;
813
814                         // Nothing was able to be sent to the server so just clear these data structures
815                         SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
816                         while (trit->hasNext()) {
817                                 Transaction *transaction = trit->next();
818                                 transaction->resetNextPartToSend();
819                                 transaction->setServerFailure();
820                         }
821                         delete trit;
822                 }
823
824                 pendingSendArbitrationEntriesToDelete->clear();
825                 transactionPartsSent->clear();
826
827                 throw e;
828         }
829
830         return newKey == NULL;
831 }
832
833 bool Table::updateFromLocal(int64_t machineId) {
834         if (!localCommunicationTable->contains(machineId))
835                 return false;
836
837         Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
838
839         // Get the size of the send data
840         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
841
842         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
843         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
844                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
845         }
846
847         Array<char> *sendData = new Array<char>(sendDataSize);
848         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
849
850         // Encode the data
851         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
852         bbEncode->putInt(0);
853
854         // Send by local
855         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
856         localSequenceNumber++;
857
858         if (returnData == NULL) {
859                 // Could not contact server
860                 return false;
861         }
862
863         // Decode the data
864         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
865         int numberOfEntries = bbDecode->getInt();
866
867         for (int i = 0; i < numberOfEntries; i++) {
868                 char type = bbDecode->get();
869                 if (type == TypeAbort) {
870                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
871                         processEntry(abort);
872                 } else if (type == TypeCommitPart) {
873                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
874                         processEntry(commitPart);
875                 }
876         }
877
878         updateLiveStateFromLocal();
879
880         return true;
881 }
882
883 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
884
885         // Get the devices local communications
886         if (!localCommunicationTable->contains(machineId))
887                 return Pair<bool, bool>(true, false);
888         
889         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
890
891         // Get the size of the send data
892         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
893         for (TransactionPart *part : transaction->getParts()->values()) {
894                 sendDataSize += part->getSize();
895         }
896
897         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
898         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
899                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
900         }
901
902         // Make the send data size
903         Array<char> *sendData = new Array<char>(sendDataSize);
904         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
905
906         // Encode the data
907         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
908         bbEncode->putInt(transaction->getParts()->size());
909         for (TransactionPart *part : transaction->getParts()->values()) {
910                 part->encode(bbEncode);
911         }
912
913
914         // Send by local
915         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
916         localSequenceNumber++;
917
918         if (returnData == NULL) {
919                 // Could not contact server
920                 return Pair<bool, bool>(true, false);
921         }
922
923         // Decode the data
924         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
925         bool didCommit = bbDecode->get() == 1;
926         bool couldArbitrate = bbDecode->get() == 1;
927         int numberOfEntries = bbDecode->getInt();
928         bool foundAbort = false;
929
930         for (int i = 0; i < numberOfEntries; i++) {
931                 char type = bbDecode->get();
932                 if (type == TypeAbort) {
933                         Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
934
935                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
936                                 foundAbort = true;
937                         }
938
939                         processEntry(abort);
940                 } else if (type == TypeCommitPart) {
941                         CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
942                         processEntry(commitPart);
943                 }
944         }
945
946         updateLiveStateFromLocal();
947
948         if (couldArbitrate) {
949                 TransactionStatus * status =  transaction->getTransactionStatus();
950                 if (didCommit) {
951                         status->setStatus(TransactionStatus_StatusCommitted);
952                 } else {
953                         status->setStatus(TransactionStatus_StatusAborted);
954                 }
955         } else {
956                 TransactionStatus * status =  transaction->getTransactionStatus();
957                 if (foundAbort) {
958                         status->setStatus(TransactionStatus_StatusAborted);
959                 } else {
960                         status->setStatus(TransactionStatus_StatusCommitted);
961                 }
962         }
963
964         return Pair<bool, bool>(false, true);
965 }
966
967 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
968
969         // Decode the data
970         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
971         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
972         int numberOfParts = bbDecode->getInt();
973
974         // If we did commit a transaction or not
975         bool didCommit = false;
976         bool couldArbitrate = false;
977
978         if (numberOfParts != 0) {
979
980                 // decode the transaction
981                 Transaction *transaction = new Transaction();
982                 for (int i = 0; i < numberOfParts; i++) {
983                         bbDecode->get();
984                         TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
985                         transaction->addPartDecode(newPart);
986                 }
987
988                 // Arbitrate on transaction and pull relevant return data
989                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
990                 couldArbitrate = localArbitrateReturn.getFirst();
991                 didCommit = localArbitrateReturn.getSecond();
992
993                 updateLiveStateFromLocal();
994
995                 // Transaction was sent to the server so keep track of it to prevent double commit
996                 if (transaction->getSequenceNumber() != -1) {
997                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
998                 }
999         }
1000
1001         // The data to send back
1002         int returnDataSize = 0;
1003         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1004
1005         // Get the aborts to send back
1006         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
1007         Collections->sort(abortLocalSequenceNumbers);
1008         uint asize = abortLocalSequenceNumbers->size();
1009         for(uint i=0; i<asize; i++) {
1010                 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1011                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1012                         continue;
1013                 }
1014                 
1015                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1016                 unseenArbitrations->add(abort);
1017                 returnDataSize += abort->getSize();
1018         }
1019
1020         // Get the commits to send back
1021         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1022         if (commitForClientTable != NULL) {
1023                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1024                 Collections->sort(commitLocalSequenceNumbers);
1025
1026                 uint clsSize = commitLocalSequenceNumbers->size();
1027                 for(uint clsi = 0; clsi < clsSize; clsi++) {
1028                         int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1029                         Commit *commit = commitForClientTable->get(localSequenceNumber);
1030
1031                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1032                                 continue;
1033                         }
1034
1035                         unseenArbitrations->addAll(commit->getParts()->values());
1036
1037                         for (CommitPart *commitPart : commit->getParts()->values()) {
1038                                 returnDataSize += commitPart->getSize();
1039                         }
1040                 }
1041         }
1042
1043         // Number of arbitration entries to decode
1044         returnDataSize += 2 * sizeof(int32_t);
1045
1046         // bool of did commit or not
1047         if (numberOfParts != 0) {
1048                 returnDataSize += sizeof(char);
1049         }
1050
1051         // Data to send Back
1052         Array<char> *returnData = new Array<char>(returnDataSize);
1053         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1054
1055         if (numberOfParts != 0) {
1056                 if (didCommit) {
1057                         bbEncode->put((char)1);
1058                 } else {
1059                         bbEncode->put((char)0);
1060                 }
1061                 if (couldArbitrate) {
1062                         bbEncode->put((char)1);
1063                 } else {
1064                         bbEncode->put((char)0);
1065                 }
1066         }
1067
1068         bbEncode->putInt(unseenArbitrations->size());
1069         uint size = unseenArbitrations->size();
1070         for (uint i = 0; i < size; i++) {
1071                 Entry *entry = unseenArbitrations->get(i);
1072                 entry->encode(bbEncode);
1073         }
1074
1075         localSequenceNumber++;
1076         return returnData;
1077 }
1078
1079 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1080         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1081         attemptedToSendToServer = true;
1082
1083         bool inserted = false;
1084         bool lastTryInserted = false;
1085
1086         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1087         if (array == NULL) {
1088                 array = new Array<Slot *>();
1089                 array->set(0, slot);
1090                 rejectedSlotVector->clear();
1091                 inserted = true;
1092         } else {
1093                 if (array->length() == 0) {
1094                         throw new Error("Server Error: Did not send any slots");
1095                 }
1096
1097                 // if (attemptedToSendToServerTmp) {
1098                 if (hadPartialSendToServer) {
1099
1100                         bool isInserted = false;
1101                         uint size = array->length();
1102                         for (uint i = 0; i < size; i++) {
1103                                 Slot *s = array->get(i);
1104                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1105                                         isInserted = true;
1106                                         break;
1107                                 }
1108                         }
1109
1110                         for (uint i = 0; i < size; i++) {
1111                                 Slot *s = array->get(i);
1112                                 if (isInserted) {
1113                                         break;
1114                                 }
1115
1116                                 // Process each entry in the slot
1117                                 Vector<Entry *> *entries = s->getEntries();
1118                                 uint eSize = entries->size();
1119                                 for(uint ei=0; ei < eSize; ei++) {
1120                                         Entry * entry = entries->get(ei);
1121
1122                                         if (entry->getType() == TypeLastMessage) {
1123                                                 LastMessage *lastMessage = (LastMessage *)entry;
1124
1125                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1126                                                         isInserted = true;
1127                                                         break;
1128                                                 }
1129                                         }
1130                                 }
1131                         }
1132
1133                         if (!isInserted) {
1134                                 rejectedSlotVector->add(slot->getSequenceNumber());
1135                                 lastTryInserted = false;
1136                         } else {
1137                                 lastTryInserted = true;
1138                         }
1139                 } else {
1140                         rejectedSlotVector->add(slot->getSequenceNumber());
1141                         lastTryInserted = false;
1142                 }
1143         }
1144
1145         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1146 }
1147
1148 /**
1149  * Returns false if a resize was needed
1150  */
1151 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1152         int newSize = 0;
1153         if (liveSlotCount > bufferResizeThreshold) {
1154                 resize = true;//Resize is forced
1155         }
1156
1157         if (resize) {
1158                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1159                 TableStatus *status = new TableStatus(slot, newSize);
1160                 slot->addEntry(status);
1161         }
1162
1163         // Fill with rejected slots first before doing anything else
1164         doRejectedMessages(slot);
1165
1166         // Do mandatory rescue of entries
1167         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1168
1169         // Extract working variables
1170         bool needsResize = mandatoryRescueReturn.getFirst();
1171         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1172         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1173
1174         if (needsResize && !resize) {
1175                 // We need to resize but we are not resizing so return false
1176                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1177         }
1178
1179         bool inserted = false;
1180         if (newKeyEntry != NULL) {
1181                 newKeyEntry->setSlot(slot);
1182                 if (slot->hasSpace(newKeyEntry)) {
1183                         slot->addEntry(newKeyEntry);
1184                         inserted = true;
1185                 }
1186         }
1187
1188         // Clear the transactions, aborts and commits that were sent previously
1189         transactionPartsSent->clear();
1190         pendingSendArbitrationEntriesToDelete->clear();
1191         uint size = pendingSendArbitrationRounds->size();
1192         for (uint i = 0; i < size; i++) {
1193                 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1194                 bool isFull = false;
1195                 round->generateParts();
1196                 Vector<Entry *> *parts = round->getParts();
1197
1198                 // Insert pending arbitration data
1199                 uint vsize = parts->size();
1200                 for (uint vi = 0; vi < vsize; vi++) {
1201                         Entry *arbitrationData = parts->get(vi);
1202
1203                         // If it is an abort then we need to set some information
1204                         if (arbitrationData->getType() == TypeAbort) {
1205                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1206                         }
1207
1208                         if (!slot->hasSpace(arbitrationData)) {
1209                                 // No space so cant do anything else with these data entries
1210                                 isFull = true;
1211                                 break;
1212                         }
1213
1214                         // Add to this current slot and add it to entries to delete
1215                         slot->addEntry(arbitrationData);
1216                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1217                 }
1218
1219                 if (isFull) {
1220                         break;
1221                 }
1222         }
1223
1224         if (pendingTransactionQueue->size() > 0) {
1225                 Transaction *transaction = pendingTransactionQueue->get(0);
1226                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1227                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1228                         transaction->setSequenceNumber(slot->getSequenceNumber());
1229                 }
1230
1231                 while (true) {
1232                         TransactionPart *part = transaction->getNextPartToSend();
1233                         if (part == NULL) {
1234                                 // Ran out of parts to send for this transaction so move on
1235                                 break;
1236                         }
1237
1238                         if (slot->hasSpace(part)) {
1239                                 slot->addEntry(part);
1240                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1241                                 if (partsSent == NULL) {
1242                                         partsSent = new Vector<int32_t>();
1243                                         transactionPartsSent->put(transaction, partsSent);
1244                                 }
1245                                 partsSent->add(part->getPartNumber());
1246                                 transactionPartsSent->put(transaction, partsSent);
1247                         } else {
1248                                 break;
1249                         }
1250                 }
1251         }
1252
1253         // Fill the remainder of the slot with rescue data
1254         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1255
1256         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1257 }
1258
1259 void Table::doRejectedMessages(Slot *s) {
1260         if (!rejectedSlotVector->isEmpty()) {
1261                 /* TODO: We should avoid generating a rejected message entry if
1262                  * there is already a sufficient entry in the queue (e->g->,
1263                  * equalsto value of true and same sequence number)->  */
1264
1265                 int64_t old_seqn = rejectedSlotVector->firstElement();
1266                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1267                         int64_t new_seqn = rejectedSlotVector->lastElement();
1268                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1269                         s->addEntry(rm);
1270                 } else {
1271                         int64_t prev_seqn = -1;
1272                         int i = 0;
1273                         /* Go through list of missing messages */
1274                         for (; i < rejectedSlotVector->size(); i++) {
1275                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1276                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1277                                 if (s_msg != NULL)
1278                                         break;
1279                                 prev_seqn = curr_seqn;
1280                         }
1281                         /* Generate rejected message entry for missing messages */
1282                         if (prev_seqn != -1) {
1283                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1284                                 s->addEntry(rm);
1285                         }
1286                         /* Generate rejected message entries for present messages */
1287                         for (; i < rejectedSlotVector->size(); i++) {
1288                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1289                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1290                                 int64_t machineid = s_msg->getMachineID();
1291                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1292                                 s->addEntry(rm);
1293                         }
1294                 }
1295         }
1296 }
1297
1298 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1299         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1300         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1301         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1302                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1303         }
1304
1305         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1306         bool seenLiveSlot = false;
1307         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1308         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1309
1310
1311         // Mandatory Rescue
1312         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1313                 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1314                 // Push slot number forward
1315                 if (!seenLiveSlot) {
1316                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1317                 }
1318
1319                 if (!previousSlot->isLive()) {
1320                         continue;
1321                 }
1322
1323                 // We have seen a live slot
1324                 seenLiveSlot = true;
1325
1326                 // Get all the live entries for a slot
1327                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1328
1329                 // Iterate over all the live entries and try to rescue them
1330                 for (Entry *liveEntry : liveEntries) {
1331                         if (slot->hasSpace(liveEntry)) {
1332                                 // Enough space to rescue the entry
1333                                 slot->addEntry(liveEntry);
1334                         } else if (currentSequenceNumber == firstIfFull) {
1335                                 //if there's no space but the entry is about to fall off the queue
1336                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1337                         }
1338                 }
1339         }
1340
1341         // Did not resize
1342         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1343 }
1344
1345 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1346         /* now go through live entries from least to greatest sequence number until
1347          * either all live slots added, or the slot doesn't have enough room
1348          * for SKIP_THRESHOLD consecutive entries*/
1349         int skipcount = 0;
1350         int64_t newestseqnum = buffer->getNewestSeqNum();
1351 search:
1352         for (; seqn <= newestseqnum; seqn++) {
1353                 Slot *prevslot = buffer->getSlot(seqn);
1354                 //Push slot number forward
1355                 if (!seenliveslot)
1356                         oldestLiveSlotSequenceNumver = seqn;
1357
1358                 if (!prevslot->isLive())
1359                         continue;
1360                 seenliveslot = true;
1361                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1362                 for (Entry *liveentry : liveentries) {
1363                         if (s->hasSpace(liveentry))
1364                                 s->addEntry(liveentry);
1365                         else {
1366                                 skipcount++;
1367                                 if (skipcount > Table_SKIP_THRESHOLD)
1368                                         goto donesearch;
1369                         }
1370                 }
1371         }
1372  donesearch:
1373         ;
1374 }
1375
1376 /**
1377  * Checks for malicious activity and updates the local copy of the block chain->
1378  */
1379 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1380         // The cloud communication layer has checked slot HMACs already
1381         // before decoding
1382         if (newSlots->length() == 0) {
1383                 return;
1384         }
1385
1386         // Make sure all slots are newer than the last largest slot this
1387         // client has seen
1388         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1389         if (firstSeqNum <= sequenceNumber) {
1390                 throw new Error("Server Error: Sent older slots!");
1391         }
1392
1393         // Create an object that can access both new slots and slots in our
1394         // local chain without committing slots to our local chain
1395         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1396
1397         // Check that the HMAC chain is not broken
1398         checkHMACChain(indexer, newSlots);
1399
1400         // Set to keep track of messages from clients
1401         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1402
1403         // Process each slots data
1404         for (Slot *slot : newSlots) {
1405                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1406
1407                 updateExpectedSize();
1408         }
1409
1410         // If there is a gap, check to see if the server sent us
1411         // everything->
1412         if (firstSeqNum != (sequenceNumber + 1)) {
1413
1414                 // Check the size of the slots that were sent down by the server->
1415                 // Can only check the size if there was a gap
1416                 checkNumSlots(newSlots->length);
1417
1418                 // Since there was a gap every machine must have pushed a slot or
1419                 // must have a last message message-> If not then the server is
1420                 // hiding slots
1421                 if (!machineSet->isEmpty()) {
1422                         throw new Error("Missing record for machines: ");
1423                 }
1424         }
1425
1426         // Update the size of our local block chain->
1427         commitNewMaxSize();
1428
1429         // Commit new to slots to the local block chain->
1430         for (Slot *slot : newSlots) {
1431
1432                 // Insert this slot into our local block chain copy->
1433                 buffer->putSlot(slot);
1434
1435                 // Keep track of how many slots are currently live (have live data
1436                 // in them)->
1437                 liveSlotCount++;
1438         }
1439
1440         // Get the sequence number of the latest slot in the system
1441         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1442         updateLiveStateFromServer();
1443
1444         // No Need to remember after we pulled from the server
1445         offlineTransactionsCommittedAndAtServer->clear();
1446
1447         // This is invalidated now
1448         hadPartialSendToServer = false;
1449 }
1450
1451 void Table::updateLiveStateFromServer() {
1452         // Process the new transaction parts
1453         processNewTransactionParts();
1454
1455         // Do arbitration on new transactions that were received
1456         arbitrateFromServer();
1457
1458         // Update all the committed keys
1459         bool didCommitOrSpeculate = updateCommittedTable();
1460
1461         // Delete the transactions that are now dead
1462         updateLiveTransactionsAndStatus();
1463
1464         // Do speculations
1465         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1466         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1467 }
1468
1469 void Table::updateLiveStateFromLocal() {
1470         // Update all the committed keys
1471         bool didCommitOrSpeculate = updateCommittedTable();
1472
1473         // Delete the transactions that are now dead
1474         updateLiveTransactionsAndStatus();
1475
1476         // Do speculations
1477         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1478         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1479 }
1480
1481 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1482         int64_t prevslots = firstSequenceNumber;
1483
1484         if (didFindTableStatus) {
1485         } else {
1486                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1487         }
1488
1489         didFindTableStatus = true;
1490         currMaxSize = numberOfSlots;
1491 }
1492
1493 void Table::updateExpectedSize() {
1494         expectedsize++;
1495
1496         if (expectedsize > currMaxSize) {
1497                 expectedsize = currMaxSize;
1498         }
1499 }
1500
1501
1502 /**
1503  * Check the size of the block chain to make sure there are enough
1504  * slots sent back by the server-> This is only called when we have a
1505  * gap between the slots that we have locally and the slots sent by
1506  * the server therefore in the slots sent by the server there will be
1507  * at least 1 Table status message
1508  */
1509 void Table::checkNumSlots(int numberOfSlots) {
1510         if (numberOfSlots != expectedsize) {
1511                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1512         }
1513 }
1514
1515 void Table::updateCurrMaxSize(int newmaxsize) {
1516         currMaxSize = newmaxsize;
1517 }
1518
1519
1520 /**
1521  * Update the size of of the local buffer if it is needed->
1522  */
1523 void Table::commitNewMaxSize() {
1524         didFindTableStatus = false;
1525
1526         // Resize the local slot buffer
1527         if (numberOfSlots != currMaxSize) {
1528                 buffer->resize((int32_t)currMaxSize);
1529         }
1530
1531         // Change the number of local slots to the new size
1532         numberOfSlots = (int32_t)currMaxSize;
1533
1534         // Recalculate the resize threshold since the size of the local
1535         // buffer has changed
1536         setResizeThreshold();
1537 }
1538
1539 /**
1540  * Process the new transaction parts from this latest round of slots
1541  * received from the server
1542  */
1543 void Table::processNewTransactionParts() {
1544
1545         if (newTransactionParts->size() == 0) {
1546                 // Nothing new to process
1547                 return;
1548         }
1549
1550         // Iterate through all the machine Ids that we received new parts
1551         // for
1552         for (int64_t machineId : newTransactionParts->keySet()) {
1553                 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1554
1555                 // Iterate through all the parts for that machine Id
1556                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1557                         TransactionPart *part = parts->get(partId);
1558
1559                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1560                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1561                                 // Set dead the transaction part
1562                                 part->setDead();
1563                                 continue;
1564                         }
1565
1566                         // Get the transaction object for that sequence number
1567                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1568
1569                         if (transaction == NULL) {
1570                                 // This is a new transaction that we dont have so make a new one
1571                                 transaction = new Transaction();
1572
1573                                 // Insert this new transaction into the live tables
1574                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1575                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1576                         }
1577
1578                         // Add that part to the transaction
1579                         transaction->addPartDecode(part);
1580                 }
1581         }
1582
1583         // Clear all the new transaction parts in preparation for the next
1584         // time the server sends slots
1585         newTransactionParts->clear();
1586 }
1587
1588 void Table::arbitrateFromServer() {
1589
1590         if (liveTransactionBySequenceNumberTable->size() == 0) {
1591                 // Nothing to arbitrate on so move on
1592                 return;
1593         }
1594
1595         // Get the transaction sequence numbers and sort from oldest to newest
1596         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1597         Collections->sort(transactionSequenceNumbers);
1598
1599         // Collection of key value pairs that are
1600         Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1601
1602         // The last transaction arbitrated on
1603         int64_t lastTransactionCommitted = -1;
1604         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1605
1606         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1607                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1608
1609
1610
1611                 // Check if this machine arbitrates for this transaction if not
1612                 // then we cant arbitrate this transaction
1613                 if (transaction->getArbitrator() != localMachineId) {
1614                         continue;
1615                 }
1616
1617                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1618                         continue;
1619                 }
1620
1621                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1622                         // We have seen this already locally so dont commit again
1623                         continue;
1624                 }
1625
1626
1627                 if (!transaction->isComplete()) {
1628                         // Will arbitrate in incorrect order if we continue so just break
1629                         // Most likely this
1630                         break;
1631                 }
1632
1633
1634                 // update the largest transaction seen by arbitrator from server
1635                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1636                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1637                 } else {
1638                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1639                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1640                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1641                         }
1642                 }
1643
1644                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1645                         // Guard evaluated as true
1646
1647                         // Update the local changes so we can make the commit
1648                         SetIterator<KeyValue *> *kvit = getKeyIterator(transaction->getKeyValueUpdateSet());
1649                         while (kvit->hasNext()) {
1650                                 KeyValue *kv = kvit->next();
1651                                 speculativeTableTmp->put(kv->getKey(), kv);
1652                         }
1653                         delete kvit;
1654                         
1655                         // Update what the last transaction committed was for use in batch commit
1656                         lastTransactionCommitted = transactionSequenceNumber;
1657                 } else {
1658                         // Guard evaluated was false so create abort
1659                         // Create the abort
1660                         Abort *newAbort = new Abort(NULL,
1661                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1662                                                                                                                                         transaction->getSequenceNumber(),
1663                                                                                                                                         transaction->getMachineId(),
1664                                                                                                                                         transaction->getArbitrator(),
1665                                                                                                                                         localArbitrationSequenceNumber);
1666                         localArbitrationSequenceNumber++;
1667                         generatedAborts->add(newAbort);
1668
1669                         // Insert the abort so we can process
1670                         processEntry(newAbort);
1671                 }
1672
1673                 lastSeqNumArbOn = transactionSequenceNumber;
1674         }
1675
1676         Commit *newCommit = NULL;
1677
1678         // If there is something to commit
1679         if (speculativeTableTmp->size() != 0) {
1680                 // Create the commit and increment the commit sequence number
1681                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1682                 localArbitrationSequenceNumber++;
1683
1684                 // Add all the new keys to the commit
1685                 for (KeyValue *kv : speculativeTableTmp->values()) {
1686                         newCommit->addKV(kv);
1687                 }
1688
1689                 // create the commit parts
1690                 newCommit->createCommitParts();
1691
1692                 // Append all the commit parts to the end of the pending queue
1693                 // waiting for sending to the server
1694                 // Insert the commit so we can process it
1695                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1696                         processEntry(commitPart);
1697                 }
1698         }
1699
1700         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1701                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1702                 pendingSendArbitrationRounds->add(arbitrationRound);
1703
1704                 if (compactArbitrationData()) {
1705                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1706                         if (newArbitrationRound->getCommit() != NULL) {
1707                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1708                                         processEntry(commitPart);
1709                                 }
1710                         }
1711                 }
1712         }
1713 }
1714
1715 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1716
1717         // Check if this machine arbitrates for this transaction if not then
1718         // we cant arbitrate this transaction
1719         if (transaction->getArbitrator() != localMachineId) {
1720                 return Pair<bool, bool>(false, false);
1721         }
1722
1723         if (!transaction->isComplete()) {
1724                 // Will arbitrate in incorrect order if we continue so just break
1725                 // Most likely this
1726                 return Pair<bool, bool>(false, false);
1727         }
1728
1729         if (transaction->getMachineId() != localMachineId) {
1730                 // dont do this check for local transactions
1731                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1732                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1733                                 // We've have already seen this from the server
1734                                 return Pair<bool, bool>(false, false);
1735                         }
1736                 }
1737         }
1738
1739         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1740                 // Guard evaluated as true Create the commit and increment the
1741                 // commit sequence number
1742                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1743                 localArbitrationSequenceNumber++;
1744
1745                 // Update the local changes so we can make the commit
1746                 SetIterator<KeyValue *> *kvit = getKeyIterator(transaction->getKeyValueUpdateSet());
1747                 while (kvit->hasNext()) {
1748                         KeyValue *kv = kvit->next();
1749                         newCommit->addKV(kv);
1750                 }
1751                 delete kvit;
1752                 
1753                 // create the commit parts
1754                 newCommit->createCommitParts();
1755
1756                 // Append all the commit parts to the end of the pending queue
1757                 // waiting for sending to the server
1758                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1759                 pendingSendArbitrationRounds->add(arbitrationRound);
1760
1761                 if (compactArbitrationData()) {
1762                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1763                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1764                                 processEntry(commitPart);
1765                         }
1766                 } else {
1767                         // Insert the commit so we can process it
1768                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1769                                 processEntry(commitPart);
1770                         }
1771                 }
1772
1773                 if (transaction->getMachineId() == localMachineId) {
1774                         TransactionStatus *status = transaction->getTransactionStatus();
1775                         if (status != NULL) {
1776                                 status->setStatus(TransactionStatus_StatusCommitted);
1777                         }
1778                 }
1779
1780                 updateLiveStateFromLocal();
1781                 return Pair<bool, bool>(true, true);
1782         } else {
1783                 if (transaction->getMachineId() == localMachineId) {
1784                         // For locally created messages update the status
1785                         // Guard evaluated was false so create abort
1786                         TransactionStatus * status = transaction->getTransactionStatus();
1787                         if (status != NULL) {
1788                                 status->setStatus(TransactionStatus_StatusAborted);
1789                         }
1790                 } else {
1791                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1792
1793                         // Create the abort
1794                         Abort *newAbort = new Abort(NULL,
1795                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1796                                                                                                                                         -1,
1797                                                                                                                                         transaction->getMachineId(),
1798                                                                                                                                         transaction->getArbitrator(),
1799                                                                                                                                         localArbitrationSequenceNumber);
1800                         localArbitrationSequenceNumber++;
1801                         addAbortSet->add(newAbort);
1802
1803                         // Append all the commit parts to the end of the pending queue
1804                         // waiting for sending to the server
1805                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1806                         pendingSendArbitrationRounds->add(arbitrationRound);
1807
1808                         if (compactArbitrationData()) {
1809                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1810                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1811                                         processEntry(commitPart);
1812                                 }
1813                         }
1814                 }
1815
1816                 updateLiveStateFromLocal();
1817                 return Pair<bool, bool>(true, false);
1818         }
1819 }
1820
1821 /**
1822  * Compacts the arbitration data my merging commits and aggregating
1823  * aborts so that a single large push of commits can be done instead
1824  * of many small updates
1825  */
1826 bool Table::compactArbitrationData() {
1827         if (pendingSendArbitrationRounds->size() < 2) {
1828                 // Nothing to compact so do nothing
1829                 return false;
1830         }
1831
1832         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1833         if (lastRound->getDidSendPart()) {
1834                 return false;
1835         }
1836
1837         bool hadCommit = (lastRound->getCommit() == NULL);
1838         bool gotNewCommit = false;
1839
1840         int numberToDelete = 1;
1841         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1842                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1843
1844                 if (round->isFull() || round->getDidSendPart()) {
1845                         // Stop since there is a part that cannot be compacted and we
1846                         // need to compact in order
1847                         break;
1848                 }
1849
1850                 if (round->getCommit() == NULL) {
1851                         // Try compacting aborts only
1852                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1853                         if (newSize > ArbitrationRound->MAX_PARTS) {
1854                                 // Cant compact since it would be too large
1855                                 break;
1856                         }
1857                         lastRound->addAborts(round->getAborts());
1858                 } else {
1859                         // Create a new larger commit
1860                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1861                         localArbitrationSequenceNumber++;
1862
1863                         // Create the commit parts so that we can count them
1864                         newCommit->createCommitParts();
1865
1866                         // Calculate the new size of the parts
1867                         int newSize = newCommit->getNumberOfParts();
1868                         newSize += lastRound->getAbortsCount();
1869                         newSize += round->getAbortsCount();
1870
1871                         if (newSize > ArbitrationRound->MAX_PARTS) {
1872                                 // Cant compact since it would be too large
1873                                 break;
1874                         }
1875
1876                         // Set the new compacted part
1877                         lastRound->setCommit(newCommit);
1878                         lastRound->addAborts(round->getAborts());
1879                         gotNewCommit = true;
1880                 }
1881
1882                 numberToDelete++;
1883         }
1884
1885         if (numberToDelete != 1) {
1886                 // If there is a compaction
1887                 // Delete the previous pieces that are now in the new compacted piece
1888                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1889                         pendingSendArbitrationRounds->clear();
1890                 } else {
1891                         for (int i = 0; i < numberToDelete; i++) {
1892                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1893                         }
1894                 }
1895
1896                 // Add the new compacted into the pending to send list
1897                 pendingSendArbitrationRounds->add(lastRound);
1898
1899                 // Should reinsert into the commit processor
1900                 if (hadCommit && gotNewCommit) {
1901                         return true;
1902                 }
1903         }
1904
1905         return false;
1906 }
1907
1908 /**
1909  * Update all the commits and the committed tables, sets dead the dead
1910  * transactions
1911  */
1912 bool Table::updateCommittedTable() {
1913
1914         if (newCommitParts->size() == 0) {
1915                 // Nothing new to process
1916                 return false;
1917         }
1918
1919         // Iterate through all the machine Ids that we received new parts for
1920         for (int64_t machineId : newCommitParts->keySet()) {
1921                 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1922
1923                 // Iterate through all the parts for that machine Id
1924                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1925                         CommitPart *part = parts->get(partId);
1926
1927                         // Get the transaction object for that sequence number
1928                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1929
1930                         if (commitForClientTable == NULL) {
1931                                 // This is the first commit from this device
1932                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1933                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1934                         }
1935
1936                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1937
1938                         if (commit == NULL) {
1939                                 // This is a new commit that we dont have so make a new one
1940                                 commit = new Commit();
1941
1942                                 // Insert this new commit into the live tables
1943                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1944                         }
1945
1946                         // Add that part to the commit
1947                         commit->addPartDecode(part);
1948                 }
1949         }
1950
1951         // Clear all the new commits parts in preparation for the next time
1952         // the server sends slots
1953         newCommitParts->clear();
1954
1955         // If we process a new commit keep track of it for future use
1956         bool didProcessANewCommit = false;
1957
1958         // Process the commits one by one
1959         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1960
1961                 // Get all the commits for a specific arbitrator
1962                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1963
1964                 // Sort the commits in order
1965                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1966                 Collections->sort(commitSequenceNumbers);
1967
1968                 // Get the last commit seen from this arbitrator
1969                 int64_t lastCommitSeenSequenceNumber = -1;
1970                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1971                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1972                 }
1973
1974                 // Go through each new commit one by one
1975                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1976                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1977                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1978
1979                         // Special processing if a commit is not complete
1980                         if (!commit->isComplete()) {
1981                                 if (i == (commitSequenceNumbers->size() - 1)) {
1982                                         // If there is an incomplete commit and this commit is the
1983                                         // latest one seen then this commit cannot be processed and
1984                                         // there are no other commits
1985                                         break;
1986                                 } else {
1987                                         // This is a commit that was already dead but parts of it
1988                                         // are still in the block chain (not flushed out yet)->
1989                                         // Delete it and move on
1990                                         commit->setDead();
1991                                         commitForClientTable->remove(commit->getSequenceNumber());
1992                                         continue;
1993                                 }
1994                         }
1995
1996                         // Update the last transaction that was updated if we can
1997                         if (commit->getTransactionSequenceNumber() != -1) {
1998                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1999
2000                                 // Update the last transaction sequence number that the arbitrator arbitrated on
2001                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2002                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2003                                 }
2004                         }
2005
2006                         // Update the last arbitration data that we have seen so far
2007                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2008                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2009                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2010                                         // Is larger
2011                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2012                                 }
2013                         } else {
2014                                 // Never seen any data from this arbitrator so record the first one
2015                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2016                         }
2017
2018                         // We have already seen this commit before so need to do the
2019                         // full processing on this commit
2020                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2021
2022                                 // Update the last transaction that was updated if we can
2023                                 if (commit->getTransactionSequenceNumber() != -1) {
2024                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2025
2026                                         // Update the last transaction sequence number that the arbitrator arbitrated on
2027                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2028                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2029                                         }
2030                                 }
2031
2032                                 continue;
2033                         }
2034
2035                         // If we got here then this is a brand new commit and needs full
2036                         // processing
2037                         // Get what commits should be edited, these are the commits that
2038                         // have live values for their keys
2039                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2040                         SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2041                         while (kvit->hasNext()) {
2042                                 KeyValue *kv = kvit->next();
2043                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2044                         }
2045                         delete kvit;
2046                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2047
2048                         // Update each previous commit that needs to be updated
2049                         for (Commit *previousCommit : commitsToEdit) {
2050
2051                                 // Only bother with live commits (TODO: Maybe remove this check)
2052                                 if (previousCommit->isLive()) {
2053
2054                                         // Update which keys in the old commits are still live
2055                                         SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2056                                         while (kvit->hasNext()) {
2057                                                 KeyValue *kv = kvit->next();
2058                                                 previousCommit->invalidateKey(kv->getKey());
2059                                         }
2060                                         delete kvit;
2061                                         
2062                                         // if the commit is now dead then remove it
2063                                         if (!previousCommit->isLive()) {
2064                                                 commitForClientTable->remove(previousCommit);
2065                                         }
2066                                 }
2067                         }
2068
2069                         // Update the last seen sequence number from this arbitrator
2070                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2071                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2072                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2073                                 }
2074                         } else {
2075                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2076                         }
2077
2078                         // We processed a new commit that we havent seen before
2079                         didProcessANewCommit = true;
2080
2081                         // Update the committed table of keys and which commit is using which key
2082                         SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2083                         while (kvit->hasNext()) {
2084                                 KeyValue *kv = kvit->next();
2085                                 committedKeyValueTable->put(kv->getKey(), kv);
2086                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2087                         }
2088                         delete kvit;
2089                 }
2090         }
2091
2092         return didProcessANewCommit;
2093 }
2094
2095 /**
2096  * Create the speculative table from transactions that are still live
2097  * and have come from the cloud
2098  */
2099 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2100         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2101                 // There is nothing to speculate on
2102                 return false;
2103         }
2104
2105         // Create a list of the transaction sequence numbers and sort them
2106         // from oldest to newest
2107         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2108         Collections->sort(transactionSequenceNumbersSorted);
2109
2110         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2111
2112
2113         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2114                 // If there is a gap in the transaction sequence numbers then
2115                 // there was a commit or an abort of a transaction OR there was a
2116                 // new commit (Could be from offline commit) so a redo the
2117                 // speculation from scratch
2118
2119                 // Start from scratch
2120                 speculatedKeyValueTable->clear();
2121                 lastTransactionSequenceNumberSpeculatedOn = -1;
2122                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2123
2124         }
2125
2126         // Remember the front of the transaction list
2127         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2128
2129         // Find where to start arbitration from
2130         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2131
2132         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2133                 // Make sure we are not out of bounds
2134                 return false;           // did not speculate
2135         }
2136
2137         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2138         bool didSkip = true;
2139
2140         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2141                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2142                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2143
2144                 if (!transaction->isComplete()) {
2145                         // If there is an incomplete transaction then there is nothing
2146                         // we can do add this transactions arbitrator to the list of
2147                         // arbitrators we should ignore
2148                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2149                         didSkip = true;
2150                         continue;
2151                 }
2152
2153                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2154                         continue;
2155                 }
2156
2157                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2158
2159                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2160                         // Guard evaluated to true so update the speculative table
2161                         SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2162                         while (kvit->hasNext()) {
2163                                 KeyValue *kv = kvit->next();
2164                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2165                         }
2166                         delete kvit;
2167                 }
2168         }
2169
2170         if (didSkip) {
2171                 // Since there was a skip we need to redo the speculation next time around
2172                 lastTransactionSequenceNumberSpeculatedOn = -1;
2173                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2174         }
2175
2176         // We did some speculation
2177         return true;
2178 }
2179
2180 /**
2181  * Create the pending transaction speculative table from transactions
2182  * that are still in the pending transaction buffer
2183  */
2184 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2185         if (pendingTransactionQueue->size() == 0) {
2186                 // There is nothing to speculate on
2187                 return;
2188         }
2189
2190         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2191                 // need to reset on the pending speculation
2192                 lastPendingTransactionSpeculatedOn = NULL;
2193                 firstPendingTransaction = pendingTransactionQueue->get(0);
2194                 pendingTransactionSpeculatedKeyValueTable->clear();
2195         }
2196
2197         // Find where to start arbitration from
2198         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2199
2200         if (startIndex >= pendingTransactionQueue->size()) {
2201                 // Make sure we are not out of bounds
2202                 return;
2203         }
2204
2205         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2206                 Transaction *transaction = pendingTransactionQueue->get(i);
2207
2208                 lastPendingTransactionSpeculatedOn = transaction;
2209
2210                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2211                         // Guard evaluated to true so update the speculative table
2212                         SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2213                         while (kvit->hasNext()) {
2214                                 KeyValue *kv = kvit->next();
2215                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2216                         }
2217                         delete kvit;
2218                 }
2219         }
2220 }
2221
2222 /**
2223  * Set dead and remove from the live transaction tables the
2224  * transactions that are dead
2225  */
2226 void Table::updateLiveTransactionsAndStatus() {
2227
2228         // Go through each of the transactions
2229         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2230                 Transaction *transaction = iter->next()->getValue();
2231
2232                 // Check if the transaction is dead
2233                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2234                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2235
2236                         // Set dead the transaction
2237                         transaction->setDead();
2238
2239                         // Remove the transaction from the live table
2240                         iter->remove();
2241                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2242                 }
2243         }
2244
2245         // Go through each of the transactions
2246         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2247                 TransactionStatus *status = iter->next()->getValue();
2248
2249                 // Check if the transaction is dead
2250                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2251                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2252
2253                         // Set committed
2254                         status->setStatus(TransactionStatus_StatusCommitted);
2255
2256                         // Remove
2257                         iter->remove();
2258                 }
2259         }
2260 }
2261
2262 /**
2263  * Process this slot, entry by entry->  Also update the latest message sent by slot
2264  */
2265 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2266
2267         // Update the last message seen
2268         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2269
2270         // Process each entry in the slot
2271         Vector<Entry *> *entries = slot->getEntries();
2272         uint eSize = entries->size();
2273         for(uint ei=0; ei < eSize; ei++) {
2274                 Entry * entry = entries->get(ei);
2275                 switch (entry->getType()) {
2276                 case TypeCommitPart:
2277                         processEntry((CommitPart *)entry);
2278                         break;
2279                 case TypeAbort:
2280                         processEntry((Abort *)entry);
2281                         break;
2282                 case TypeTransactionPart:
2283                         processEntry((TransactionPart *)entry);
2284                         break;
2285                 case TypeNewKey:
2286                         processEntry((NewKey *)entry);
2287                         break;
2288                 case TypeLastMessage:
2289                         processEntry((LastMessage *)entry, machineSet);
2290                         break;
2291                 case TypeRejectedMessage:
2292                         processEntry((RejectedMessage *)entry, indexer);
2293                         break;
2294                 case TypeTableStatus:
2295                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2296                         break;
2297                 default:
2298                         throw new Error("Unrecognized type: ");
2299                 }
2300         }
2301 }
2302
2303 /**
2304  * Update the last message that was sent for a machine Id
2305  */
2306 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2307         // Update what the last message received by a machine was
2308         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2309 }
2310
2311 /**
2312  * Add the new key to the arbitrators table and update the set of live
2313  * new keys (in case of a rescued new key message)
2314  */
2315 void Table::processEntry(NewKey *entry) {
2316         // Update the arbitrator table with the new key information
2317         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2318
2319         // Update what the latest live new key is
2320         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2321         if (oldNewKey != NULL) {
2322                 // Delete the old new key messages
2323                 oldNewKey->setDead();
2324         }
2325 }
2326
2327 /**
2328  * Process new table status entries and set dead the old ones as new
2329  * ones come in-> keeps track of the largest and smallest table status
2330  * seen in this current round of updating the local copy of the block
2331  * chain
2332  */
2333 void Table::processEntry(TableStatus entry, int64_t seq) {
2334         int newNumSlots = entry->getMaxSlots();
2335         updateCurrMaxSize(newNumSlots);
2336         initExpectedSize(seq, newNumSlots);
2337
2338         if (liveTableStatus != NULL) {
2339                 // We have a larger table status so the old table status is no
2340                 // int64_ter alive
2341                 liveTableStatus->setDead();
2342         }
2343
2344         // Make this new table status the latest alive table status
2345         liveTableStatus = entry;
2346 }
2347
2348 /**
2349  * Check old messages to see if there is a block chain violation->
2350  * Also
2351  */
2352 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2353         int64_t oldSeqNum = entry->getOldSeqNum();
2354         int64_t newSeqNum = entry->getNewSeqNum();
2355         bool isequal = entry->getEqual();
2356         int64_t machineId = entry->getMachineID();
2357         int64_t seq = entry->getSequenceNumber();
2358
2359         // Check if we have messages that were supposed to be rejected in
2360         // our local block chain
2361         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2362                 // Get the slot
2363                 Slot *slot = indexer->getSlot(seqNum);
2364
2365                 if (slot != NULL) {
2366                         // If we have this slot make sure that it was not supposed to be
2367                         // a rejected slot
2368                         int64_t slotMachineId = slot->getMachineID();
2369                         if (isequal != (slotMachineId == machineId)) {
2370                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2371                         }
2372                 }
2373         }
2374
2375         // Create a list of clients to watch until they see this rejected
2376         // message entry->
2377         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2378         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2379                 // Machine ID for the last message entry
2380                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2381
2382                 // We've seen it, don't need to continue to watch->  Our next
2383                 // message will implicitly acknowledge it->
2384                 if (lastMessageEntryMachineId == localMachineId) {
2385                         continue;
2386                 }
2387
2388                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2389                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2390
2391                 if (entrySequenceNumber < seq) {
2392                         // Add this rejected message to the set of messages that this
2393                         // machine ID did not see yet
2394                         addWatchVector(lastMessageEntryMachineId, entry);
2395                         // This client did not see this rejected message yet so add it
2396                         // to the watch set to monitor
2397                         deviceWatchSet->add(lastMessageEntryMachineId);
2398                 }
2399         }
2400         if (deviceWatchSet->isEmpty()) {
2401                 // This rejected message has been seen by all the clients so
2402                 entry->setDead();
2403         } else {
2404                 // We need to watch this rejected message
2405                 entry->setWatchSet(deviceWatchSet);
2406         }
2407 }
2408
2409 /**
2410  * Check if this abort is live, if not then save it so we can kill it
2411  * later-> update the last transaction number that was arbitrated on->
2412  */
2413 void Table::processEntry(Abort *entry) {
2414         if (entry->getTransactionSequenceNumber() != -1) {
2415                 // update the transaction status if it was sent to the server
2416                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2417                 if (status != NULL) {
2418                         status->setStatus(TransactionStatus_StatusAborted);
2419                 }
2420         }
2421
2422         // Abort has not been seen by the client it is for yet so we need to
2423         // keep track of it
2424         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2425         if (previouslySeenAbort != NULL) {
2426                 previouslySeenAbort->setDead();         // Delete old version of the abort since we got a rescued newer version
2427         }
2428
2429         if (entry->getTransactionArbitrator() == localMachineId) {
2430                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2431         }
2432
2433         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2434                 // The machine already saw this so it is dead
2435                 entry->setDead();
2436                 liveAbortTable->remove(entry->getAbortId());
2437
2438                 if (entry->getTransactionArbitrator() == localMachineId) {
2439                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2440                 }
2441                 return;
2442         }
2443
2444         // Update the last arbitration data that we have seen so far
2445         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2446                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2447                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2448                         // Is larger
2449                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2450                 }
2451         } else {
2452                 // Never seen any data from this arbitrator so record the first one
2453                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2454         }
2455
2456         // Set dead a transaction if we can
2457         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2458         if (transactionToSetDead != NULL) {
2459                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2460         }
2461
2462         // Update the last transaction sequence number that the arbitrator
2463         // arbitrated on
2464         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2465         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2466                 // Is a valid one
2467                 if (entry->getTransactionSequenceNumber() != -1) {
2468                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2469                 }
2470         }
2471 }
2472
2473 /**
2474  * Set dead the transaction part if that transaction is dead and keep
2475  * track of all new parts
2476  */
2477 void Table::processEntry(TransactionPart *entry) {
2478         // Check if we have already seen this transaction and set it dead OR
2479         // if it is not alive
2480         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2481         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2482                 // This transaction is dead, it was already committed or aborted
2483                 entry->setDead();
2484                 return;
2485         }
2486
2487         // This part is still alive
2488         Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2489
2490         if (transactionPart == NULL) {
2491                 // Dont have a table for this machine Id yet so make one
2492                 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2493                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2494         }
2495
2496         // Update the part and set dead ones we have already seen (got a
2497         // rescued version)
2498         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2499         if (previouslySeenPart != NULL) {
2500                 previouslySeenPart->setDead();
2501         }
2502 }
2503
2504 /**
2505  * Process new commit entries and save them for future use->  Delete duplicates
2506  */
2507 void Table::processEntry(CommitPart *entry) {
2508         // Update the last transaction that was updated if we can
2509         if (entry->getTransactionSequenceNumber() != -1) {
2510                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2511                 // Update the last transaction sequence number that the arbitrator
2512                 // arbitrated on
2513                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2514                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2515                 }
2516         }
2517
2518         Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2519         if (commitPart == NULL) {
2520                 // Don't have a table for this machine Id yet so make one
2521                 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2522                 newCommitParts->put(entry->getMachineId(), commitPart);
2523         }
2524         // Update the part and set dead ones we have already seen (got a
2525         // rescued version)
2526         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2527         if (previouslySeenPart != NULL) {
2528                 previouslySeenPart->setDead();
2529         }
2530 }
2531
2532 /**
2533  * Update the last message seen table-> Update and set dead the
2534  * appropriate RejectedMessages as clients see them-> Updates the live
2535  * aborts, removes those that are dead and sets them dead-> Check that
2536  * the last message seen is correct and that there is no mismatch of
2537  * our own last message or that other clients have not had a rollback
2538  * on the last message->
2539  */
2540 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2541         // We have seen this machine ID
2542         machineSet->remove(machineId);
2543
2544         // Get the set of rejected messages that this machine Id is has not seen yet
2545         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2546         // If there is a rejected message that this machine Id has not seen yet
2547         if (watchset != NULL) {
2548                 // Go through each rejected message that this machine Id has not
2549                 // seen yet
2550                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2551                         RejectedMessage *rm = rmit->next();
2552                         // If this machine Id has seen this rejected message->->->
2553                         if (rm->getSequenceNumber() <= seqNum) {
2554                                 // Remove it from our watchlist
2555                                 rmit->remove();
2556                                 // Decrement machines that need to see this notification
2557                                 rm->removeWatcher(machineId);
2558                         }
2559                 }
2560         }
2561
2562         // Set dead the abort
2563         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2564                 Abort *abort = i->next()->getValue();
2565                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2566                         abort->setDead();
2567                         i->remove();
2568                         if (abort->getTransactionArbitrator() == localMachineId) {
2569                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2570                         }
2571                 }
2572         }
2573         if (machineId == localMachineId) {
2574                 // Our own messages are immediately dead->
2575                 if (liveness instanceof LastMessage) {
2576                         ((LastMessage *)liveness)->setDead();
2577                 } else if (liveness instanceof Slot) {
2578                         ((Slot *)liveness)->setDead();
2579                 } else {
2580                         throw new Error("Unrecognized type");
2581                 }
2582         }
2583         // Get the old last message for this device
2584         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2585         if (lastMessageEntry == NULL) {
2586                 // If no last message then there is nothing else to process
2587                 return;
2588         }
2589
2590         int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2591         Liveness *lastEntry = lastMessageEntry.getSecond();
2592
2593         // If it is not our machine Id since we already set ours to dead
2594         if (machineId != localMachineId) {
2595                 if (lastEntry instanceof LastMessage) {
2596                         ((LastMessage *)lastEntry)->setDead();
2597                 } else if (lastEntry instanceof Slot) {
2598                         ((Slot *)lastEntry)->setDead();
2599                 } else {
2600                         throw new Error("Unrecognized type");
2601                 }
2602         }
2603         // Make sure the server is not playing any games
2604         if (machineId == localMachineId) {
2605                 if (hadPartialSendToServer) {
2606                         // We were not making any updates and we had a machine mismatch
2607                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2608                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2609                         }
2610                 } else {
2611                         // We were not making any updates and we had a machine mismatch
2612                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2613                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2614                         }
2615                 }
2616         } else {
2617                 if (lastMessageSeqNum > seqNum) {
2618                         throw new Error("Server Error: Rollback on remote machine sequence number");
2619                 }
2620         }
2621 }
2622
2623 /**
2624  * Add a rejected message entry to the watch set to keep track of
2625  * which clients have seen that rejected message entry and which have
2626  * not.
2627  */
2628 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2629         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2630         if (entries == NULL) {
2631                 // There is no set for this machine ID yet so create one
2632                 entries = new Hashset<RejectedMessage *>();
2633                 rejectedMessageWatchVectorTable->put(machineId, entries);
2634         }
2635         entries->add(entry);
2636 }
2637
2638 /**
2639  * Check if the HMAC chain is not violated
2640  */
2641 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2642         for (int i = 0; i < newSlots->length(); i++) {
2643                 Slot *currSlot = newSlots->get(i);
2644                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2645                 if (prevSlot != NULL &&
2646                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2647                         throw new Error("Server Error: Invalid HMAC Chain");
2648         }
2649 }