Another file
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14
15
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
17         buffer(NULL),
18         cloud(new CloudComm(this, baseurl, password, listeningPort)),
19         random(NULL),
20         liveTableStatus(NULL),
21         pendingTransactionBuilder(NULL),
22         lastPendingTransactionSpeculatedOn(NULL),
23         firstPendingTransaction(NULL),
24         numberOfSlots(0),
25         bufferResizeThreshold(0),
26         liveSlotCount(0),
27         oldestLiveSlotSequenceNumver(1),
28         localMachineId(_localMachineId),
29         sequenceNumber(0),
30         localTransactionSequenceNumber(0),
31         lastTransactionSequenceNumberSpeculatedOn(0),
32         oldestTransactionSequenceNumberSpeculatedOn(0),
33         localArbitrationSequenceNumber(0),
34         hadPartialSendToServer(false),
35         attemptedToSendToServer(false),
36         expectedsize(0),
37         didFindTableStatus(false),
38         currMaxSize(0),
39         lastSlotAttemptedToSend(NULL),
40         lastIsNewKey(false),
41         lastNewSize(0),
42         lastTransactionPartsSent(NULL),
43         lastPendingSendArbitrationEntriesToDelete(NULL),
44         lastNewKey(NULL),
45         committedKeyValueTable(NULL),
46         speculatedKeyValueTable(NULL),
47         pendingTransactionSpeculatedKeyValueTable(NULL),
48         liveNewKeyTable(NULL),
49         lastMessageTable(NULL),
50         rejectedMessageWatchVectorTable(NULL),
51         arbitratorTable(NULL),
52         liveAbortTable(NULL),
53         newTransactionParts(NULL),
54         newCommitParts(NULL),
55         lastArbitratedTransactionNumberByArbitratorTable(NULL),
56         liveTransactionBySequenceNumberTable(NULL),
57         liveTransactionByTransactionIdTable(NULL),
58         liveCommitsTable(NULL),
59         liveCommitsByKeyTable(NULL),
60         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61         rejectedSlotVector(NULL),
62         pendingTransactionQueue(NULL),
63         pendingSendArbitrationRounds(NULL),
64         pendingSendArbitrationEntriesToDelete(NULL),
65         transactionPartsSent(NULL),
66         outstandingTransactionStatus(NULL),
67         liveAbortsGeneratedByLocal(NULL),
68         offlineTransactionsCommittedAndAtServer(NULL),
69         localCommunicationTable(NULL),
70         lastTransactionSeenFromMachineFromServer(NULL),
71         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72         lastInsertedNewKey(false),
73         lastSeqNumArbOn(0)
74 {
75         init();
76 }
77
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
79         buffer(NULL),
80         cloud(_cloud),
81         random(NULL),
82         liveTableStatus(NULL),
83         pendingTransactionBuilder(NULL),
84         lastPendingTransactionSpeculatedOn(NULL),
85         firstPendingTransaction(NULL),
86         numberOfSlots(0),
87         bufferResizeThreshold(0),
88         liveSlotCount(0),
89         oldestLiveSlotSequenceNumver(1),
90         localMachineId(_localMachineId),
91         sequenceNumber(0),
92         localTransactionSequenceNumber(0),
93         lastTransactionSequenceNumberSpeculatedOn(0),
94         oldestTransactionSequenceNumberSpeculatedOn(0),
95         localArbitrationSequenceNumber(0),
96         hadPartialSendToServer(false),
97         attemptedToSendToServer(false),
98         expectedsize(0),
99         didFindTableStatus(false),
100         currMaxSize(0),
101         lastSlotAttemptedToSend(NULL),
102         lastIsNewKey(false),
103         lastNewSize(0),
104         lastTransactionPartsSent(NULL),
105         lastPendingSendArbitrationEntriesToDelete(NULL),
106         lastNewKey(NULL),
107         committedKeyValueTable(NULL),
108         speculatedKeyValueTable(NULL),
109         pendingTransactionSpeculatedKeyValueTable(NULL),
110         liveNewKeyTable(NULL),
111         lastMessageTable(NULL),
112         rejectedMessageWatchVectorTable(NULL),
113         arbitratorTable(NULL),
114         liveAbortTable(NULL),
115         newTransactionParts(NULL),
116         newCommitParts(NULL),
117         lastArbitratedTransactionNumberByArbitratorTable(NULL),
118         liveTransactionBySequenceNumberTable(NULL),
119         liveTransactionByTransactionIdTable(NULL),
120         liveCommitsTable(NULL),
121         liveCommitsByKeyTable(NULL),
122         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123         rejectedSlotVector(NULL),
124         pendingTransactionQueue(NULL),
125         pendingSendArbitrationRounds(NULL),
126         pendingSendArbitrationEntriesToDelete(NULL),
127         transactionPartsSent(NULL),
128         outstandingTransactionStatus(NULL),
129         liveAbortsGeneratedByLocal(NULL),
130         offlineTransactionsCommittedAndAtServer(NULL),
131         localCommunicationTable(NULL),
132         lastTransactionSeenFromMachineFromServer(NULL),
133         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134         lastInsertedNewKey(false),
135         lastSeqNumArbOn(0)
136 {
137         init();
138 }
139
140 /**
141  * Init all the stuff needed for for table usage
142  */
143 void Table::init() {
144         // Init helper objects
145         random = new Random();
146         buffer = new SlotBuffer();
147
148         // init data structs
149         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155         arbitratorTable = new Hashtable<IoTString *, int64_t>();
156         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165         rejectedSlotVector = new Vector<int64_t>();
166         pendingTransactionQueue = new Vector<Transaction *>();
167         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
176
177         // Other init stuff
178         numberOfSlots = buffer->capacity();
179         setResizeThreshold();
180 }
181
182 /**
183  * Initialize the table by inserting a table status as the first entry
184  * into the table status also initialize the crypto stuff.
185  */
186 void Table::initTable() {
187         cloud->initSecurity();
188
189         // Create the first insertion into the block chain which is the table status
190         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
191         localSequenceNumber++;
192         TableStatus *status = new TableStatus(s, numberOfSlots);
193         s->addEntry(status);
194         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
195
196         if (array == NULL) {
197                 array = new Array<Slot *>(1);
198                 array->set(0, s);
199                 // update local block chain
200                 validateAndUpdate(array, true);
201         } else if (array->length() == 1) {
202                 // in case we did push the slot BUT we failed to init it
203                 validateAndUpdate(array, true);
204         } else {
205                 throw new Error("Error on initialization");
206         }
207 }
208
209 /**
210  * Rebuild the table from scratch by pulling the latest block chain
211  * from the server.
212  */
213 void Table::rebuild() {
214         // Just pull the latest slots from the server
215         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
216         validateAndUpdate(newslots, true);
217         sendToServer(NULL);
218         updateLiveTransactionsAndStatus();
219 }
220
221 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
222         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
223 }
224
225 int64_t Table::getArbitrator(IoTString *key) {
226         return arbitratorTable->get(key);
227 }
228
229 void Table::close() {
230         cloud->close();
231 }
232
233 IoTString *Table::getCommitted(IoTString *key)  {
234         KeyValue *kv = committedKeyValueTable->get(key);
235
236         if (kv != NULL) {
237                 return kv->getValue();
238         } else {
239                 return NULL;
240         }
241 }
242
243 IoTString *Table::getSpeculative(IoTString *key) {
244         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
245
246         if (kv == NULL) {
247                 kv = speculatedKeyValueTable->get(key);
248         }
249
250         if (kv == NULL) {
251                 kv = committedKeyValueTable->get(key);
252         }
253
254         if (kv != NULL) {
255                 return kv->getValue();
256         } else {
257                 return NULL;
258         }
259 }
260
261 IoTString *Table::getCommittedAtomic(IoTString *key) {
262         KeyValue *kv = committedKeyValueTable->get(key);
263
264         if (!arbitratorTable->contains(key)) {
265                 throw new Error("Key not Found.");
266         }
267
268         // Make sure new key value pair matches the current arbitrator
269         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
270                 // TODO: Maybe not throw en error
271                 throw new Error("Not all Key Values Match Arbitrator.");
272         }
273
274         if (kv != NULL) {
275                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
276                 return kv->getValue();
277         } else {
278                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
279                 return NULL;
280         }
281 }
282
283 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
284         if (!arbitratorTable->contains(key)) {
285                 throw new Error("Key not Found.");
286         }
287
288         // Make sure new key value pair matches the current arbitrator
289         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
290                 // TODO: Maybe not throw en error
291                 throw new Error("Not all Key Values Match Arbitrator.");
292         }
293
294         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
295
296         if (kv == NULL) {
297                 kv = speculatedKeyValueTable->get(key);
298         }
299
300         if (kv == NULL) {
301                 kv = committedKeyValueTable->get(key);
302         }
303
304         if (kv != NULL) {
305                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
306                 return kv->getValue();
307         } else {
308                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
309                 return NULL;
310         }
311 }
312
313 bool Table::update()  {
314         try {
315                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
316                 validateAndUpdate(newSlots, false);
317                 sendToServer(NULL);
318                 updateLiveTransactionsAndStatus();
319                 return true;
320         } catch (Exception *e) {
321                 for (int64_t m : localCommunicationTable->keySet()) {
322                         updateFromLocal(m);
323                 }
324         }
325
326         return false;
327 }
328
329 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
330         while (true) {
331                 if (!arbitratorTable->contains(keyName)) {
332                         // There is already an arbitrator
333                         return false;
334                 }
335                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
336
337                 if (sendToServer(newKey)) {
338                         // If successfully inserted
339                         return true;
340                 }
341         }
342 }
343
344 void Table::startTransaction() {
345         // Create a new transaction, invalidates any old pending transactions.
346         pendingTransactionBuilder = new PendingTransaction(localMachineId);
347 }
348
349 void Table::addKV(IoTString *key, IoTString *value) {
350
351         // Make sure it is a valid key
352         if (!arbitratorTable->contains(key)) {
353                 throw new Error("Key not Found.");
354         }
355
356         // Make sure new key value pair matches the current arbitrator
357         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
358                 // TODO: Maybe not throw en error
359                 throw new Error("Not all Key Values Match Arbitrator.");
360         }
361
362         // Add the key value to this transaction
363         KeyValue *kv = new KeyValue(key, value);
364         pendingTransactionBuilder->addKV(kv);
365 }
366
367 TransactionStatus *Table::commitTransaction() {
368         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
369                 // transaction with no updates will have no effect on the system
370                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
371         }
372
373         // Set the local transaction sequence number and increment
374         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
375         localTransactionSequenceNumber++;
376
377         // Create the transaction status
378         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
379
380         // Create the new transaction
381         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
382         newTransaction->setTransactionStatus(transactionStatus);
383
384         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
385                 // Add it to the queue and invalidate the builder for safety
386                 pendingTransactionQueue->add(newTransaction);
387         } else {
388                 arbitrateOnLocalTransaction(newTransaction);
389                 updateLiveStateFromLocal();
390         }
391
392         pendingTransactionBuilder = new PendingTransaction(localMachineId);
393
394         try {
395                 sendToServer(NULL);
396         } catch (ServerException *e) {
397
398                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
399                 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
400                         Transaction *transaction = iter->next();
401
402                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
403                                 // Already contacted this client so ignore all attempts to contact this client
404                                 // to preserve ordering for arbitrator
405                                 continue;
406                         }
407
408                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
409
410                         if (sendReturn->getFirst()) {
411                                 // Failed to contact over local
412                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
413                         } else {
414                                 // Successful contact or should not contact
415
416                                 if (sendReturn->getSecond()) {
417                                         // did arbitrate
418                                         iter->remove();
419                                 }
420                         }
421                 }
422         }
423
424         updateLiveStateFromLocal();
425
426         return transactionStatus;
427 }
428
429 /**
430  * Recalculate the new resize threshold
431  */
432 void Table::setResizeThreshold() {
433         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
434         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
435 }
436
437 int64_t Table::getLocalSequenceNumber() {
438         return localSequenceNumber;
439 }
440
441 bool Table::sendToServer(NewKey *newKey) {
442         bool fromRetry = false;
443         try {
444                 if (hadPartialSendToServer) {
445                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
446                         if (newSlots->length() == 0) {
447                                 fromRetry = true;
448                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
449
450                                 if (sendSlotsReturn->getFirst()) {
451                                         if (newKey != NULL) {
452                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
453                                                         newKey = NULL;
454                                                 }
455                                         }
456
457                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
458                                                 transaction->resetServerFailure();
459                                                 // Update which transactions parts still need to be sent
460                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
461                                                 // Add the transaction status to the outstanding list
462                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
463
464                                                 // Update the transaction status
465                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
466
467                                                 // Check if all the transaction parts were successfully
468                                                 // sent and if so then remove it from pending
469                                                 if (transaction->didSendAllParts()) {
470                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
471                                                         pendingTransactionQueue->remove(transaction);
472                                                 }
473                                         }
474                                 } else {
475                                         newSlots = sendSlotsReturn->getThird();
476                                         bool isInserted = false;
477                                         for (uint si = 0; si < newSlots->length(); si++) {
478                                                 Slot *s = newSlots->get(si);
479                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
480                                                         isInserted = true;
481                                                         break;
482                                                 }
483                                         }
484
485                                         for (uint si = 0; si < newSlots->length(); si++) {
486                                                 Slot *s = newSlots->get(si);
487                                                 if (isInserted) {
488                                                         break;
489                                                 }
490
491                                                 // Process each entry in the slot
492                                                 for (Entry *entry : s->getEntries()) {
493                                                         if (entry->getType() == TypeLastMessage) {
494                                                                 LastMessage *lastMessage = (LastMessage *)entry;
495                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
496                                                                         isInserted = true;
497                                                                         break;
498                                                                 }
499                                                         }
500                                                 }
501                                         }
502
503                                         if (isInserted) {
504                                                 if (newKey != NULL) {
505                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
506                                                                 newKey = NULL;
507                                                         }
508                                                 }
509
510                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
511                                                         transaction->resetServerFailure();
512
513                                                         // Update which transactions parts still need to be sent
514                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
515
516                                                         // Add the transaction status to the outstanding list
517                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
518
519                                                         // Update the transaction status
520                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
521
522                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
523                                                         if (transaction->didSendAllParts()) {
524                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
525                                                                 pendingTransactionQueue->remove(transaction);
526                                                         } else {
527                                                                 transaction->resetServerFailure();
528                                                                 // Set the transaction sequence number back to nothing
529                                                                 if (!transaction->didSendAPartToServer()) {
530                                                                         transaction->setSequenceNumber(-1);
531                                                                 }
532                                                         }
533                                                 }
534                                         }
535                                 }
536
537                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
538                                         transaction->resetServerFailure();
539                                         // Set the transaction sequence number back to nothing
540                                         if (!transaction->didSendAPartToServer()) {
541                                                 transaction->setSequenceNumber(-1);
542                                         }
543                                 }
544
545                                 if (sendSlotsReturn->getThird()->length() != 0) {
546                                         // insert into the local block chain
547                                         validateAndUpdate(sendSlotsReturn->getThird(), true);
548                                 }
549                                 // continue;
550                         } else {
551                                 bool isInserted = false;
552                                 for (uint si = 0; si < newSlots->length(); si++) {
553                                         Slot *s = newSlots->get(si);
554                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
555                                                 isInserted = true;
556                                                 break;
557                                         }
558                                 }
559
560                                 for (uint si = 0; si < newSlots->length(); si++) {
561                                         Slot *s = newSlots->get(si);
562                                         if (isInserted) {
563                                                 break;
564                                         }
565
566                                         // Process each entry in the slot
567                                         for (Entry *entry : s->getEntries()) {
568
569                                                 if (entry->getType() == TypeLastMessage) {
570                                                         LastMessage *lastMessage = (LastMessage *)entry;
571                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
572                                                                 isInserted = true;
573                                                                 break;
574                                                         }
575                                                 }
576                                         }
577                                 }
578
579                                 if (isInserted) {
580                                         if (newKey != NULL) {
581                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
582                                                         newKey = NULL;
583                                                 }
584                                         }
585
586                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
587                                                 transaction->resetServerFailure();
588
589                                                 // Update which transactions parts still need to be sent
590                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
591
592                                                 // Add the transaction status to the outstanding list
593                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
594
595                                                 // Update the transaction status
596                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
597
598                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
599                                                 if (transaction->didSendAllParts()) {
600                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
601                                                         pendingTransactionQueue->remove(transaction);
602                                                 } else {
603                                                         transaction->resetServerFailure();
604                                                         // Set the transaction sequence number back to nothing
605                                                         if (!transaction->didSendAPartToServer()) {
606                                                                 transaction->setSequenceNumber(-1);
607                                                         }
608                                                 }
609                                         }
610                                 } else {
611                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
612                                                 transaction->resetServerFailure();
613                                                 // Set the transaction sequence number back to nothing
614                                                 if (!transaction->didSendAPartToServer()) {
615                                                         transaction->setSequenceNumber(-1);
616                                                 }
617                                         }
618                                 }
619
620                                 // insert into the local block chain
621                                 validateAndUpdate(newSlots, true);
622                         }
623                 }
624         } catch (ServerException *e) {
625                 throw e;
626         }
627
628
629
630         try {
631                 // While we have stuff that needs inserting into the block chain
632                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
633
634                         fromRetry = false;
635
636                         if (hadPartialSendToServer) {
637                                 throw new Error("Should Be error free");
638                         }
639
640
641
642                         // If there is a new key with same name then end
643                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
644                                 return false;
645                         }
646
647                         // Create the slot
648                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
649                         localSequenceNumber++;
650
651                         // Try to fill the slot with data
652                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
653                         bool needsResize = fillSlotsReturn->getFirst();
654                         int newSize = fillSlotsReturn->getSecond();
655                         bool insertedNewKey = fillSlotsReturn->getThird();
656
657                         if (needsResize) {
658                                 // Reset which transaction to send
659                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
660                                         transaction->resetNextPartToSend();
661
662                                         // Set the transaction sequence number back to nothing
663                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
664                                                 transaction->setSequenceNumber(-1);
665                                         }
666                                 }
667
668                                 // Clear the sent data since we are trying again
669                                 pendingSendArbitrationEntriesToDelete->clear();
670                                 transactionPartsSent->clear();
671
672                                 // We needed a resize so try again
673                                 fillSlot(slot, true, newKey);
674                         }
675
676                         lastSlotAttemptedToSend = slot;
677                         lastIsNewKey = (newKey != NULL);
678                         lastInsertedNewKey = insertedNewKey;
679                         lastNewSize = newSize;
680                         lastNewKey = newKey;
681                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
682                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
683
684
685                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
686
687                         if (sendSlotsReturn->getFirst()) {
688
689                                 // Did insert into the block chain
690
691                                 if (insertedNewKey) {
692                                         // This slot was what was inserted not a previous slot
693
694                                         // New Key was successfully inserted into the block chain so dont want to insert it again
695                                         newKey = NULL;
696                                 }
697
698                                 // Remove the aborts and commit parts that were sent from the pending to send queue
699                                 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
700                                         ArbitrationRound *round = iter->next();
701                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
702
703                                         if (round->isDoneSending()) {
704                                                 // Sent all the parts
705                                                 iter->remove();
706                                         }
707                                 }
708
709                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
710                                         transaction->resetServerFailure();
711
712                                         // Update which transactions parts still need to be sent
713                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
714
715                                         // Add the transaction status to the outstanding list
716                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
717
718                                         // Update the transaction status
719                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
720
721                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
722                                         if (transaction->didSendAllParts()) {
723                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
724                                                 pendingTransactionQueue->remove(transaction);
725                                         }
726                                 }
727                         } else {
728                                 // Reset which transaction to send
729                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
730                                         transaction->resetNextPartToSend();
731
732                                         // Set the transaction sequence number back to nothing
733                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
734                                                 transaction->setSequenceNumber(-1);
735                                         }
736                                 }
737                         }
738
739                         // Clear the sent data in preparation for next send
740                         pendingSendArbitrationEntriesToDelete->clear();
741                         transactionPartsSent->clear();
742
743                         if (sendSlotsReturn->getThird()->length() != 0) {
744                                 // insert into the local block chain
745                                 validateAndUpdate(sendSlotsReturn->getThird(), true);
746                         }
747                 }
748
749         } catch (ServerException *e) {
750
751                 if (e->getType() != ServerException->TypeInputTimeout) {
752                         // Nothing was able to be sent to the server so just clear these data structures
753                         for (Transaction *transaction : transactionPartsSent->keySet()) {
754                                 transaction->resetNextPartToSend();
755
756                                 // Set the transaction sequence number back to nothing
757                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
758                                         transaction->setSequenceNumber(-1);
759                                 }
760                         }
761                 } else {
762                         // There was a partial send to the server
763                         hadPartialSendToServer = true;
764
765                         // Nothing was able to be sent to the server so just clear these data structures
766                         for (Transaction *transaction : transactionPartsSent->keySet()) {
767                                 transaction->resetNextPartToSend();
768                                 transaction->setServerFailure();
769                         }
770                 }
771
772                 pendingSendArbitrationEntriesToDelete->clear();
773                 transactionPartsSent->clear();
774
775                 throw e;
776         }
777
778         return newKey == NULL;
779 }
780
781 bool Table::updateFromLocal(int64_t machineId) {
782         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
783         if (localCommunicationInformation == NULL) {
784                 // Cant talk to that device locally so do nothing
785                 return false;
786         }
787
788         // Get the size of the send data
789         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
790
791         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
792         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
793                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
794         }
795
796         Array<char> *sendData = new Array<char>(sendDataSize);
797         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
798
799         // Encode the data
800         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
801         bbEncode->putInt(0);
802
803         // Send by local
804         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
805         localSequenceNumber++;
806
807         if (returnData == NULL) {
808                 // Could not contact server
809                 return false;
810         }
811
812         // Decode the data
813         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
814         int numberOfEntries = bbDecode->getInt();
815
816         for (int i = 0; i < numberOfEntries; i++) {
817                 char type = bbDecode->get();
818                 if (type == TypeAbort) {
819                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
820                         processEntry(abort);
821                 } else if (type == TypeCommitPart) {
822                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
823                         processEntry(commitPart);
824                 }
825         }
826
827         updateLiveStateFromLocal();
828
829         return true;
830 }
831
832 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
833
834         // Get the devices local communications
835         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
836
837         if (localCommunicationInformation == NULL) {
838                 // Cant talk to that device locally so do nothing
839                 return Pair<bool, bool>(true, false);
840         }
841
842         // Get the size of the send data
843         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
844         for (TransactionPart *part : transaction->getParts()->values()) {
845                 sendDataSize += part->getSize();
846         }
847
848         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
849         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
850                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
851         }
852
853         // Make the send data size
854         Array<char> *sendData = new Array<char>(sendDataSize);
855         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
856
857         // Encode the data
858         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
859         bbEncode->putInt(transaction->getParts()->size());
860         for (TransactionPart *part : transaction->getParts()->values()) {
861                 part->encode(bbEncode);
862         }
863
864
865         // Send by local
866         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
867         localSequenceNumber++;
868
869         if (returnData == NULL) {
870                 // Could not contact server
871                 return Pair<bool, bool>(true, false);
872         }
873
874         // Decode the data
875         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
876         bool didCommit = bbDecode->get() == 1;
877         bool couldArbitrate = bbDecode->get() == 1;
878         int numberOfEntries = bbDecode->getInt();
879         bool foundAbort = false;
880
881         for (int i = 0; i < numberOfEntries; i++) {
882                 char type = bbDecode->get();
883                 if (type == TypeAbort) {
884                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
885
886                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
887                                 foundAbort = true;
888                         }
889
890                         processEntry(abort);
891                 } else if (type == TypeCommitPart) {
892                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
893                         processEntry(commitPart);
894                 }
895         }
896
897         updateLiveStateFromLocal();
898
899         if (couldArbitrate) {
900                 TransactionStatus status =  transaction->getTransactionStatus();
901                 if (didCommit) {
902                         status->setStatus(TransactionStatus_StatusCommitted);
903                 } else {
904                         status->setStatus(TransactionStatus_StatusAborted);
905                 }
906         } else {
907                 TransactionStatus status =  transaction->getTransactionStatus();
908                 if (foundAbort) {
909                         status->setStatus(TransactionStatus_StatusAborted);
910                 } else {
911                         status->setStatus(TransactionStatus_StatusCommitted);
912                 }
913         }
914
915         return Pair<bool, bool>(false, true);
916 }
917
918 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
919
920         // Decode the data
921         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
922         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
923         int numberOfParts = bbDecode->getInt();
924
925         // If we did commit a transaction or not
926         bool didCommit = false;
927         bool couldArbitrate = false;
928
929         if (numberOfParts != 0) {
930
931                 // decode the transaction
932                 Transaction *transaction = new Transaction();
933                 for (int i = 0; i < numberOfParts; i++) {
934                         bbDecode->get();
935                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
936                         transaction->addPartDecode(newPart);
937                 }
938
939                 // Arbitrate on transaction and pull relevant return data
940                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
941                 couldArbitrate = localArbitrateReturn->getFirst();
942                 didCommit = localArbitrateReturn->getSecond();
943
944                 updateLiveStateFromLocal();
945
946                 // Transaction was sent to the server so keep track of it to prevent double commit
947                 if (transaction->getSequenceNumber() != -1) {
948                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
949                 }
950         }
951
952         // The data to send back
953         int returnDataSize = 0;
954         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
955
956         // Get the aborts to send back
957         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
958         Collections->sort(abortLocalSequenceNumbers);
959         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
960                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
961                         continue;
962                 }
963
964                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
965                 unseenArbitrations->add(abort);
966                 returnDataSize += abort->getSize();
967         }
968
969         // Get the commits to send back
970         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
971         if (commitForClientTable != NULL) {
972                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
973                 Collections->sort(commitLocalSequenceNumbers);
974
975                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
976                         Commit *commit = commitForClientTable->get(localSequenceNumber);
977
978                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
979                                 continue;
980                         }
981
982                         unseenArbitrations->addAll(commit->getParts()->values());
983
984                         for (CommitPart commitPart : commit->getParts()->values()) {
985                                 returnDataSize += commitPart->getSize();
986                         }
987                 }
988         }
989
990         // Number of arbitration entries to decode
991         returnDataSize += 2 * sizeof(int32_t);
992
993         // bool of did commit or not
994         if (numberOfParts != 0) {
995                 returnDataSize += sizeof(char);
996         }
997
998         // Data to send Back
999         Array<char> *returnData = new Array<char>(returnDataSize);
1000         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1001
1002         if (numberOfParts != 0) {
1003                 if (didCommit) {
1004                         bbEncode->put((char)1);
1005                 } else {
1006                         bbEncode->put((char)0);
1007                 }
1008                 if (couldArbitrate) {
1009                         bbEncode->put((char)1);
1010                 } else {
1011                         bbEncode->put((char)0);
1012                 }
1013         }
1014
1015         bbEncode->putInt(unseenArbitrations->size());
1016         for (Entry *entry : unseenArbitrations) {
1017                 entry->encode(bbEncode);
1018         }
1019
1020
1021         localSequenceNumber++;
1022         return returnData;
1023 }
1024
1025 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1026         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1027         attemptedToSendToServer = true;
1028
1029         bool inserted = false;
1030         bool lastTryInserted = false;
1031
1032         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1033         if (array == NULL) {
1034                 array = new Array<Slot *>();
1035                 array->set(0, slot);
1036                 rejectedSlotVector->clear();
1037                 inserted = true;
1038         } else {
1039                 if (array->length() == 0) {
1040                         throw new Error("Server Error: Did not send any slots");
1041                 }
1042
1043                 // if (attemptedToSendToServerTmp) {
1044                 if (hadPartialSendToServer) {
1045
1046                         bool isInserted = false;
1047                         for (Slot *s : array) {
1048                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1049                                         isInserted = true;
1050                                         break;
1051                                 }
1052                         }
1053
1054                         for (Slot *s : array) {
1055                                 if (isInserted) {
1056                                         break;
1057                                 }
1058
1059                                 // Process each entry in the slot
1060                                 for (Entry *entry : s->getEntries()) {
1061
1062                                         if (entry->getType() == TypeLastMessage) {
1063                                                 LastMessage *lastMessage = (LastMessage *)entry;
1064
1065                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1066                                                         isInserted = true;
1067                                                         break;
1068                                                 }
1069                                         }
1070                                 }
1071                         }
1072
1073                         if (!isInserted) {
1074                                 rejectedSlotVector->add(slot->getSequenceNumber());
1075                                 lastTryInserted = false;
1076                         } else {
1077                                 lastTryInserted = true;
1078                         }
1079                 } else {
1080                         rejectedSlotVector->add(slot->getSequenceNumber());
1081                         lastTryInserted = false;
1082                 }
1083         }
1084
1085         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1086 }
1087
1088 /**
1089  * Returns false if a resize was needed
1090  */
1091 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1092         int newSize = 0;
1093         if (liveSlotCount > bufferResizeThreshold) {
1094                 resize = true;  //Resize is forced
1095         }
1096
1097         if (resize) {
1098                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1099                 TableStatus *status = new TableStatus(slot, newSize);
1100                 slot->addEntry(status);
1101         }
1102
1103         // Fill with rejected slots first before doing anything else
1104         doRejectedMessages(slot);
1105
1106         // Do mandatory rescue of entries
1107         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1108
1109         // Extract working variables
1110         bool needsResize = mandatoryRescueReturn->getFirst();
1111         bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1112         int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1113
1114         if (needsResize && !resize) {
1115                 // We need to resize but we are not resizing so return false
1116                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1117         }
1118
1119         bool inserted = false;
1120         if (newKeyEntry != NULL) {
1121                 newKeyEntry->setSlot(slot);
1122                 if (slot->hasSpace(newKeyEntry)) {
1123                         slot->addEntry(newKeyEntry);
1124                         inserted = true;
1125                 }
1126         }
1127
1128         // Clear the transactions, aborts and commits that were sent previously
1129         transactionPartsSent->clear();
1130         pendingSendArbitrationEntriesToDelete->clear();
1131
1132         for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1133                 bool isFull = false;
1134                 round->generateParts();
1135                 Vector<Entry *> *parts = round->getParts();
1136
1137                 // Insert pending arbitration data
1138                 for (Entry *arbitrationData : parts) {
1139
1140                         // If it is an abort then we need to set some information
1141                         if (arbitrationData instanceof Abort) {
1142                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1143                         }
1144
1145                         if (!slot->hasSpace(arbitrationData)) {
1146                                 // No space so cant do anything else with these data entries
1147                                 isFull = true;
1148                                 break;
1149                         }
1150
1151                         // Add to this current slot and add it to entries to delete
1152                         slot->addEntry(arbitrationData);
1153                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1154                 }
1155
1156                 if (isFull) {
1157                         break;
1158                 }
1159         }
1160
1161         if (pendingTransactionQueue->size() > 0) {
1162                 Transaction *transaction = pendingTransactionQueue->get(0);
1163                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1164                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1165                         transaction->setSequenceNumber(slot->getSequenceNumber());
1166                 }
1167
1168                 while (true) {
1169                         TransactionPart *part = transaction->getNextPartToSend();
1170                         if (part == NULL) {
1171                                 // Ran out of parts to send for this transaction so move on
1172                                 break;
1173                         }
1174
1175                         if (slot->hasSpace(part)) {
1176                                 slot->addEntry(part);
1177                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1178                                 if (partsSent == NULL) {
1179                                         partsSent = new Vector<int32_t>();
1180                                         transactionPartsSent->put(transaction, partsSent);
1181                                 }
1182                                 partsSent->add(part->getPartNumber());
1183                                 transactionPartsSent->put(transaction, partsSent);
1184                         } else {
1185                                 break;
1186                         }
1187                 }
1188         }
1189
1190         // Fill the remainder of the slot with rescue data
1191         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1192
1193         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1194 }
1195
1196 void Table::doRejectedMessages(Slot *s) {
1197         if (!rejectedSlotVector->isEmpty()) {
1198                 /* TODO: We should avoid generating a rejected message entry if
1199                  * there is already a sufficient entry in the queue (e->g->,
1200                  * equalsto value of true and same sequence number)->  */
1201
1202                 int64_t old_seqn = rejectedSlotVector->firstElement();
1203                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1204                         int64_t new_seqn = rejectedSlotVector->lastElement();
1205                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1206                         s->addEntry(rm);
1207                 } else {
1208                         int64_t prev_seqn = -1;
1209                         int i = 0;
1210                         /* Go through list of missing messages */
1211                         for (; i < rejectedSlotVector->size(); i++) {
1212                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1213                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1214                                 if (s_msg != NULL)
1215                                         break;
1216                                 prev_seqn = curr_seqn;
1217                         }
1218                         /* Generate rejected message entry for missing messages */
1219                         if (prev_seqn != -1) {
1220                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1221                                 s->addEntry(rm);
1222                         }
1223                         /* Generate rejected message entries for present messages */
1224                         for (; i < rejectedSlotVector->size(); i++) {
1225                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1226                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1227                                 int64_t machineid = s_msg->getMachineID();
1228                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1229                                 s->addEntry(rm);
1230                         }
1231                 }
1232         }
1233 }
1234
1235 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1236         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1237         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1238         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1239                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1240         }
1241
1242         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1243         bool seenLiveSlot = false;
1244         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1245         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1246
1247
1248         // Mandatory Rescue
1249         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1250                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1251                 // Push slot number forward
1252                 if (!seenLiveSlot) {
1253                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1254                 }
1255
1256                 if (!previousSlot->isLive()) {
1257                         continue;
1258                 }
1259
1260                 // We have seen a live slot
1261                 seenLiveSlot = true;
1262
1263                 // Get all the live entries for a slot
1264                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1265
1266                 // Iterate over all the live entries and try to rescue them
1267                 for (Entry *liveEntry : liveEntries) {
1268                         if (slot->hasSpace(liveEntry)) {
1269
1270                                 // Enough space to rescue the entry
1271                                 slot->addEntry(liveEntry);
1272                         } else if (currentSequenceNumber == firstIfFull) {
1273                                 //if there's no space but the entry is about to fall off the queue
1274                                 System->out->println("B");      //?
1275                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1276
1277                         }
1278                 }
1279         }
1280
1281         // Did not resize
1282         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1283 }
1284
1285 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1286         /* now go through live entries from least to greatest sequence number until
1287          * either all live slots added, or the slot doesn't have enough room
1288          * for SKIP_THRESHOLD consecutive entries*/
1289         int skipcount = 0;
1290         int64_t newestseqnum = buffer->getNewestSeqNum();
1291 search:
1292         for (; seqn <= newestseqnum; seqn++) {
1293                 Slot *prevslot = buffer->getSlot(seqn);
1294                 //Push slot number forward
1295                 if (!seenliveslot)
1296                         oldestLiveSlotSequenceNumver = seqn;
1297
1298                 if (!prevslot->isLive())
1299                         continue;
1300                 seenliveslot = true;
1301                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1302                 for (Entry *liveentry : liveentries) {
1303                         if (s->hasSpace(liveentry))
1304                                 s->addEntry(liveentry);
1305                         else {
1306                                 skipcount++;
1307                                 if (skipcount > Table_SKIP_THRESHOLD)
1308                                         break search;
1309                         }
1310                 }
1311         }
1312 }
1313
1314 /**
1315  * Checks for malicious activity and updates the local copy of the block chain->
1316  */
1317 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1318         // The cloud communication layer has checked slot HMACs already
1319         // before decoding
1320         if (newSlots->length() == 0) {
1321                 return;
1322         }
1323
1324         // Make sure all slots are newer than the last largest slot this
1325         // client has seen
1326         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1327         if (firstSeqNum <= sequenceNumber) {
1328                 throw new Error("Server Error: Sent older slots!");
1329         }
1330
1331         // Create an object that can access both new slots and slots in our
1332         // local chain without committing slots to our local chain
1333         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1334
1335         // Check that the HMAC chain is not broken
1336         checkHMACChain(indexer, newSlots);
1337
1338         // Set to keep track of messages from clients
1339         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1340
1341         // Process each slots data
1342         for (Slot *slot : newSlots) {
1343                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1344
1345                 updateExpectedSize();
1346         }
1347
1348         // If there is a gap, check to see if the server sent us
1349         // everything->
1350         if (firstSeqNum != (sequenceNumber + 1)) {
1351
1352                 // Check the size of the slots that were sent down by the server->
1353                 // Can only check the size if there was a gap
1354                 checkNumSlots(newSlots->length);
1355
1356                 // Since there was a gap every machine must have pushed a slot or
1357                 // must have a last message message-> If not then the server is
1358                 // hiding slots
1359                 if (!machineSet->isEmpty()) {
1360                         throw new Error("Missing record for machines: " + machineSet);
1361                 }
1362         }
1363
1364         // Update the size of our local block chain->
1365         commitNewMaxSize();
1366
1367         // Commit new to slots to the local block chain->
1368         for (Slot *slot : newSlots) {
1369
1370                 // Insert this slot into our local block chain copy->
1371                 buffer->putSlot(slot);
1372
1373                 // Keep track of how many slots are currently live (have live data
1374                 // in them)->
1375                 liveSlotCount++;
1376         }
1377
1378         // Get the sequence number of the latest slot in the system
1379         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1380         updateLiveStateFromServer();
1381
1382         // No Need to remember after we pulled from the server
1383         offlineTransactionsCommittedAndAtServer->clear();
1384
1385         // This is invalidated now
1386         hadPartialSendToServer = false;
1387 }
1388
1389 void Table::updateLiveStateFromServer() {
1390         // Process the new transaction parts
1391         processNewTransactionParts();
1392
1393         // Do arbitration on new transactions that were received
1394         arbitrateFromServer();
1395
1396         // Update all the committed keys
1397         bool didCommitOrSpeculate = updateCommittedTable();
1398
1399         // Delete the transactions that are now dead
1400         updateLiveTransactionsAndStatus();
1401
1402         // Do speculations
1403         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1404         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1405 }
1406
1407 void Table::updateLiveStateFromLocal() {
1408         // Update all the committed keys
1409         bool didCommitOrSpeculate = updateCommittedTable();
1410
1411         // Delete the transactions that are now dead
1412         updateLiveTransactionsAndStatus();
1413
1414         // Do speculations
1415         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1416         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1417 }
1418
1419 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1420         int64_t prevslots = firstSequenceNumber;
1421
1422         if (didFindTableStatus) {
1423         } else {
1424                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1425         }
1426
1427         didFindTableStatus = true;
1428         currMaxSize = numberOfSlots;
1429 }
1430
1431 void Table::updateExpectedSize() {
1432         expectedsize++;
1433
1434         if (expectedsize > currMaxSize) {
1435                 expectedsize = currMaxSize;
1436         }
1437 }
1438
1439
1440 /**
1441  * Check the size of the block chain to make sure there are enough
1442  * slots sent back by the server-> This is only called when we have a
1443  * gap between the slots that we have locally and the slots sent by
1444  * the server therefore in the slots sent by the server there will be
1445  * at least 1 Table status message
1446  */
1447 void Table::checkNumSlots(int numberOfSlots) {
1448         if (numberOfSlots != expectedsize) {
1449                 throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
1450         }
1451 }
1452
1453 void Table::updateCurrMaxSize(int newmaxsize) {
1454         currMaxSize = newmaxsize;
1455 }
1456
1457
1458 /**
1459  * Update the size of of the local buffer if it is needed->
1460  */
1461 void Table::commitNewMaxSize() {
1462         didFindTableStatus = false;
1463
1464         // Resize the local slot buffer
1465         if (numberOfSlots != currMaxSize) {
1466                 buffer->resize((int32_t)currMaxSize);
1467         }
1468
1469         // Change the number of local slots to the new size
1470         numberOfSlots = (int32_t)currMaxSize;
1471
1472         // Recalculate the resize threshold since the size of the local
1473         // buffer has changed
1474         setResizeThreshold();
1475 }
1476
1477 /**
1478  * Process the new transaction parts from this latest round of slots
1479  * received from the server
1480  */
1481 void Table::processNewTransactionParts() {
1482
1483         if (newTransactionParts->size() == 0) {
1484                 // Nothing new to process
1485                 return;
1486         }
1487
1488         // Iterate through all the machine Ids that we received new parts
1489         // for
1490         for (int64_t machineId : newTransactionParts->keySet()) {
1491                 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1492
1493                 // Iterate through all the parts for that machine Id
1494                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1495                         TransactionPart *part = parts->get(partId);
1496
1497                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1498                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1499                                 // Set dead the transaction part
1500                                 part->setDead();
1501                                 continue;
1502                         }
1503
1504                         // Get the transaction object for that sequence number
1505                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1506
1507                         if (transaction == NULL) {
1508                                 // This is a new transaction that we dont have so make a new one
1509                                 transaction = new Transaction();
1510
1511                                 // Insert this new transaction into the live tables
1512                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1513                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1514                         }
1515
1516                         // Add that part to the transaction
1517                         transaction->addPartDecode(part);
1518                 }
1519         }
1520
1521         // Clear all the new transaction parts in preparation for the next
1522         // time the server sends slots
1523         newTransactionParts->clear();
1524 }
1525
1526 void Table::arbitrateFromServer() {
1527
1528         if (liveTransactionBySequenceNumberTable->size() == 0) {
1529                 // Nothing to arbitrate on so move on
1530                 return;
1531         }
1532
1533         // Get the transaction sequence numbers and sort from oldest to newest
1534         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1535         Collections->sort(transactionSequenceNumbers);
1536
1537         // Collection of key value pairs that are
1538         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1539
1540         // The last transaction arbitrated on
1541         int64_t lastTransactionCommitted = -1;
1542         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1543
1544         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1545                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1546
1547
1548
1549                 // Check if this machine arbitrates for this transaction if not
1550                 // then we cant arbitrate this transaction
1551                 if (transaction->getArbitrator() != localMachineId) {
1552                         continue;
1553                 }
1554
1555                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1556                         continue;
1557                 }
1558
1559                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1560                         // We have seen this already locally so dont commit again
1561                         continue;
1562                 }
1563
1564
1565                 if (!transaction->isComplete()) {
1566                         // Will arbitrate in incorrect order if we continue so just break
1567                         // Most likely this
1568                         break;
1569                 }
1570
1571
1572                 // update the largest transaction seen by arbitrator from server
1573                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1574                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1575                 } else {
1576                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1577                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1578                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1579                         }
1580                 }
1581
1582                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1583                         // Guard evaluated as true
1584
1585                         // Update the local changes so we can make the commit
1586                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1587                                 speculativeTableTmp->put(kv->getKey(), kv);
1588                         }
1589
1590                         // Update what the last transaction committed was for use in batch commit
1591                         lastTransactionCommitted = transactionSequenceNumber;
1592                 } else {
1593                         // Guard evaluated was false so create abort
1594                         // Create the abort
1595                         Abort *newAbort = new Abort(NULL,
1596                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1597                                                                                                                                         transaction->getSequenceNumber(),
1598                                                                                                                                         transaction->getMachineId(),
1599                                                                                                                                         transaction->getArbitrator(),
1600                                                                                                                                         localArbitrationSequenceNumber);
1601                         localArbitrationSequenceNumber++;
1602                         generatedAborts->add(newAbort);
1603
1604                         // Insert the abort so we can process
1605                         processEntry(newAbort);
1606                 }
1607
1608                 lastSeqNumArbOn = transactionSequenceNumber;
1609         }
1610
1611         Commit *newCommit = NULL;
1612
1613         // If there is something to commit
1614         if (speculativeTableTmp->size() != 0) {
1615                 // Create the commit and increment the commit sequence number
1616                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1617                 localArbitrationSequenceNumber++;
1618
1619                 // Add all the new keys to the commit
1620                 for (KeyValue *kv : speculativeTableTmp->values()) {
1621                         newCommit->addKV(kv);
1622                 }
1623
1624                 // create the commit parts
1625                 newCommit->createCommitParts();
1626
1627                 // Append all the commit parts to the end of the pending queue
1628                 // waiting for sending to the server
1629                 // Insert the commit so we can process it
1630                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1631                         processEntry(commitPart);
1632                 }
1633         }
1634
1635         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1636                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1637                 pendingSendArbitrationRounds->add(arbitrationRound);
1638
1639                 if (compactArbitrationData()) {
1640                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1641                         if (newArbitrationRound->getCommit() != NULL) {
1642                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1643                                         processEntry(commitPart);
1644                                 }
1645                         }
1646                 }
1647         }
1648 }
1649
1650 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1651
1652         // Check if this machine arbitrates for this transaction if not then
1653         // we cant arbitrate this transaction
1654         if (transaction->getArbitrator() != localMachineId) {
1655                 return Pair<bool, bool>(false, false);
1656         }
1657
1658         if (!transaction->isComplete()) {
1659                 // Will arbitrate in incorrect order if we continue so just break
1660                 // Most likely this
1661                 return Pair<bool, bool>(false, false);
1662         }
1663
1664         if (transaction->getMachineId() != localMachineId) {
1665                 // dont do this check for local transactions
1666                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1667                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1668                                 // We've have already seen this from the server
1669                                 return Pair<bool, bool>(false, false);
1670                         }
1671                 }
1672         }
1673
1674         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1675                 // Guard evaluated as true Create the commit and increment the
1676                 // commit sequence number
1677                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1678                 localArbitrationSequenceNumber++;
1679
1680                 // Update the local changes so we can make the commit
1681                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1682                         newCommit->addKV(kv);
1683                 }
1684
1685                 // create the commit parts
1686                 newCommit->createCommitParts();
1687
1688                 // Append all the commit parts to the end of the pending queue
1689                 // waiting for sending to the server
1690                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1691                 pendingSendArbitrationRounds->add(arbitrationRound);
1692
1693                 if (compactArbitrationData()) {
1694                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1695                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1696                                 processEntry(commitPart);
1697                         }
1698                 } else {
1699                         // Insert the commit so we can process it
1700                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1701                                 processEntry(commitPart);
1702                         }
1703                 }
1704
1705                 if (transaction->getMachineId() == localMachineId) {
1706                         TransactionStatus *status = transaction->getTransactionStatus();
1707                         if (status != NULL) {
1708                                 status->setStatus(TransactionStatus_StatusCommitted);
1709                         }
1710                 }
1711
1712                 updateLiveStateFromLocal();
1713                 return Pair<bool, bool>(true, true);
1714         } else {
1715                 if (transaction->getMachineId() == localMachineId) {
1716                         // For locally created messages update the status
1717                         // Guard evaluated was false so create abort
1718                         TransactionStatus status = transaction->getTransactionStatus();
1719                         if (status != NULL) {
1720                                 status->setStatus(TransactionStatus_StatusAborted);
1721                         }
1722                 } else {
1723                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1724
1725                         // Create the abort
1726                         Abort *newAbort = new Abort(NULL,
1727                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1728                                                                                                                                         -1,
1729                                                                                                                                         transaction->getMachineId(),
1730                                                                                                                                         transaction->getArbitrator(),
1731                                                                                                                                         localArbitrationSequenceNumber);
1732                         localArbitrationSequenceNumber++;
1733                         addAbortSet->add(newAbort);
1734
1735                         // Append all the commit parts to the end of the pending queue
1736                         // waiting for sending to the server
1737                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1738                         pendingSendArbitrationRounds->add(arbitrationRound);
1739
1740                         if (compactArbitrationData()) {
1741                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1742                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1743                                         processEntry(commitPart);
1744                                 }
1745                         }
1746                 }
1747
1748                 updateLiveStateFromLocal();
1749                 return Pair<bool, bool>(true, false);
1750         }
1751 }
1752
1753 /**
1754  * Compacts the arbitration data my merging commits and aggregating
1755  * aborts so that a single large push of commits can be done instead
1756  * of many small updates
1757  */
1758 bool Table::compactArbitrationData() {
1759         if (pendingSendArbitrationRounds->size() < 2) {
1760                 // Nothing to compact so do nothing
1761                 return false;
1762         }
1763
1764         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1765         if (lastRound->didSendPart()) {
1766                 return false;
1767         }
1768
1769         bool hadCommit = (lastRound->getCommit() == NULL);
1770         bool gotNewCommit = false;
1771
1772         int numberToDelete = 1;
1773         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1774                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1775
1776                 if (round->isFull() || round->didSendPart()) {
1777                         // Stop since there is a part that cannot be compacted and we
1778                         // need to compact in order
1779                         break;
1780                 }
1781
1782                 if (round->getCommit() == NULL) {
1783                         // Try compacting aborts only
1784                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1785                         if (newSize > ArbitrationRound->MAX_PARTS) {
1786                                 // Cant compact since it would be too large
1787                                 break;
1788                         }
1789                         lastRound->addAborts(round->getAborts());
1790                 } else {
1791                         // Create a new larger commit
1792                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1793                         localArbitrationSequenceNumber++;
1794
1795                         // Create the commit parts so that we can count them
1796                         newCommit->createCommitParts();
1797
1798                         // Calculate the new size of the parts
1799                         int newSize = newCommit->getNumberOfParts();
1800                         newSize += lastRound->getAbortsCount();
1801                         newSize += round->getAbortsCount();
1802
1803                         if (newSize > ArbitrationRound->MAX_PARTS) {
1804                                 // Cant compact since it would be too large
1805                                 break;
1806                         }
1807
1808                         // Set the new compacted part
1809                         lastRound->setCommit(newCommit);
1810                         lastRound->addAborts(round->getAborts());
1811                         gotNewCommit = true;
1812                 }
1813
1814                 numberToDelete++;
1815         }
1816
1817         if (numberToDelete != 1) {
1818                 // If there is a compaction
1819                 // Delete the previous pieces that are now in the new compacted piece
1820                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1821                         pendingSendArbitrationRounds->clear();
1822                 } else {
1823                         for (int i = 0; i < numberToDelete; i++) {
1824                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1825                         }
1826                 }
1827
1828                 // Add the new compacted into the pending to send list
1829                 pendingSendArbitrationRounds->add(lastRound);
1830
1831                 // Should reinsert into the commit processor
1832                 if (hadCommit && gotNewCommit) {
1833                         return true;
1834                 }
1835         }
1836
1837         return false;
1838 }
1839
1840 /**
1841  * Update all the commits and the committed tables, sets dead the dead
1842  * transactions
1843  */
1844 bool Table::updateCommittedTable() {
1845
1846         if (newCommitParts->size() == 0) {
1847                 // Nothing new to process
1848                 return false;
1849         }
1850
1851         // Iterate through all the machine Ids that we received new parts for
1852         for (int64_t machineId : newCommitParts->keySet()) {
1853                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1854
1855                 // Iterate through all the parts for that machine Id
1856                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1857                         CommitPart *part = parts->get(partId);
1858
1859                         // Get the transaction object for that sequence number
1860                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1861
1862                         if (commitForClientTable == NULL) {
1863                                 // This is the first commit from this device
1864                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1865                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1866                         }
1867
1868                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1869
1870                         if (commit == NULL) {
1871                                 // This is a new commit that we dont have so make a new one
1872                                 commit = new Commit();
1873
1874                                 // Insert this new commit into the live tables
1875                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1876                         }
1877
1878                         // Add that part to the commit
1879                         commit->addPartDecode(part);
1880                 }
1881         }
1882
1883         // Clear all the new commits parts in preparation for the next time
1884         // the server sends slots
1885         newCommitParts->clear();
1886
1887         // If we process a new commit keep track of it for future use
1888         bool didProcessANewCommit = false;
1889
1890         // Process the commits one by one
1891         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1892
1893                 // Get all the commits for a specific arbitrator
1894                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1895
1896                 // Sort the commits in order
1897                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1898                 Collections->sort(commitSequenceNumbers);
1899
1900                 // Get the last commit seen from this arbitrator
1901                 int64_t lastCommitSeenSequenceNumber = -1;
1902                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1903                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1904                 }
1905
1906                 // Go through each new commit one by one
1907                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1908                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1909                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1910
1911                         // Special processing if a commit is not complete
1912                         if (!commit->isComplete()) {
1913                                 if (i == (commitSequenceNumbers->size() - 1)) {
1914                                         // If there is an incomplete commit and this commit is the
1915                                         // latest one seen then this commit cannot be processed and
1916                                         // there are no other commits
1917                                         break;
1918                                 } else {
1919                                         // This is a commit that was already dead but parts of it
1920                                         // are still in the block chain (not flushed out yet)->
1921                                         // Delete it and move on
1922                                         commit->setDead();
1923                                         commitForClientTable->remove(commit->getSequenceNumber());
1924                                         continue;
1925                                 }
1926                         }
1927
1928                         // Update the last transaction that was updated if we can
1929                         if (commit->getTransactionSequenceNumber() != -1) {
1930                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1931
1932                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1933                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1934                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1935                                 }
1936                         }
1937
1938                         // Update the last arbitration data that we have seen so far
1939                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1940
1941                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1942                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1943                                         // Is larger
1944                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1945                                 }
1946                         } else {
1947                                 // Never seen any data from this arbitrator so record the first one
1948                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1949                         }
1950
1951                         // We have already seen this commit before so need to do the
1952                         // full processing on this commit
1953                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1954
1955                                 // Update the last transaction that was updated if we can
1956                                 if (commit->getTransactionSequenceNumber() != -1) {
1957                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1958
1959                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1960                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1961                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1962                                         }
1963                                 }
1964
1965                                 continue;
1966                         }
1967
1968                         // If we got here then this is a brand new commit and needs full
1969                         // processing
1970                         // Get what commits should be edited, these are the commits that
1971                         // have live values for their keys
1972                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1973                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1974                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1975                         }
1976                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
1977
1978                         // Update each previous commit that needs to be updated
1979                         for (Commit *previousCommit : commitsToEdit) {
1980
1981                                 // Only bother with live commits (TODO: Maybe remove this check)
1982                                 if (previousCommit->isLive()) {
1983
1984                                         // Update which keys in the old commits are still live
1985                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1986                                                 previousCommit->invalidateKey(kv->getKey());
1987                                         }
1988
1989                                         // if the commit is now dead then remove it
1990                                         if (!previousCommit->isLive()) {
1991                                                 commitForClientTable->remove(previousCommit);
1992                                         }
1993                                 }
1994                         }
1995
1996                         // Update the last seen sequence number from this arbitrator
1997                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
1998                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
1999                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2000                                 }
2001                         } else {
2002                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2003                         }
2004
2005                         // We processed a new commit that we havent seen before
2006                         didProcessANewCommit = true;
2007
2008                         // Update the committed table of keys and which commit is using which key
2009                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2010                                 committedKeyValueTable->put(kv->getKey(), kv);
2011                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2012                         }
2013                 }
2014         }
2015
2016         return didProcessANewCommit;
2017 }
2018
2019 /**
2020  * Create the speculative table from transactions that are still live
2021  * and have come from the cloud
2022  */
2023 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2024         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2025                 // There is nothing to speculate on
2026                 return false;
2027         }
2028
2029         // Create a list of the transaction sequence numbers and sort them
2030         // from oldest to newest
2031         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2032         Collections->sort(transactionSequenceNumbersSorted);
2033
2034         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2035
2036
2037         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2038                 // If there is a gap in the transaction sequence numbers then
2039                 // there was a commit or an abort of a transaction OR there was a
2040                 // new commit (Could be from offline commit) so a redo the
2041                 // speculation from scratch
2042
2043                 // Start from scratch
2044                 speculatedKeyValueTable->clear();
2045                 lastTransactionSequenceNumberSpeculatedOn = -1;
2046                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2047
2048         }
2049
2050         // Remember the front of the transaction list
2051         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2052
2053         // Find where to start arbitration from
2054         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2055
2056         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2057                 // Make sure we are not out of bounds
2058                 return false;           // did not speculate
2059         }
2060
2061         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2062         bool didSkip = true;
2063
2064         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2065                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2066                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2067
2068                 if (!transaction->isComplete()) {
2069                         // If there is an incomplete transaction then there is nothing
2070                         // we can do add this transactions arbitrator to the list of
2071                         // arbitrators we should ignore
2072                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2073                         didSkip = true;
2074                         continue;
2075                 }
2076
2077                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2078                         continue;
2079                 }
2080
2081                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2082
2083                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2084                         // Guard evaluated to true so update the speculative table
2085                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2086                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2087                         }
2088                 }
2089         }
2090
2091         if (didSkip) {
2092                 // Since there was a skip we need to redo the speculation next time around
2093                 lastTransactionSequenceNumberSpeculatedOn = -1;
2094                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2095         }
2096
2097         // We did some speculation
2098         return true;
2099 }
2100
2101 /**
2102  * Create the pending transaction speculative table from transactions
2103  * that are still in the pending transaction buffer
2104  */
2105 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2106         if (pendingTransactionQueue->size() == 0) {
2107                 // There is nothing to speculate on
2108                 return;
2109         }
2110
2111         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2112                 // need to reset on the pending speculation
2113                 lastPendingTransactionSpeculatedOn = NULL;
2114                 firstPendingTransaction = pendingTransactionQueue->get(0);
2115                 pendingTransactionSpeculatedKeyValueTable->clear();
2116         }
2117
2118         // Find where to start arbitration from
2119         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2120
2121         if (startIndex >= pendingTransactionQueue->size()) {
2122                 // Make sure we are not out of bounds
2123                 return;
2124         }
2125
2126         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2127                 Transaction *transaction = pendingTransactionQueue->get(i);
2128
2129                 lastPendingTransactionSpeculatedOn = transaction;
2130
2131                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2132                         // Guard evaluated to true so update the speculative table
2133                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2134                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2135                         }
2136                 }
2137         }
2138 }
2139
2140 /**
2141  * Set dead and remove from the live transaction tables the
2142  * transactions that are dead
2143  */
2144 void Table::updateLiveTransactionsAndStatus() {
2145
2146         // Go through each of the transactions
2147         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2148                 Transaction *transaction = iter->next()->getValue();
2149
2150                 // Check if the transaction is dead
2151                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2152                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2153
2154                         // Set dead the transaction
2155                         transaction->setDead();
2156
2157                         // Remove the transaction from the live table
2158                         iter->remove();
2159                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2160                 }
2161         }
2162
2163         // Go through each of the transactions
2164         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2165                 TransactionStatus *status = iter->next()->getValue();
2166
2167                 // Check if the transaction is dead
2168                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2169                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2170
2171                         // Set committed
2172                         status->setStatus(TransactionStatus_StatusCommitted);
2173
2174                         // Remove
2175                         iter->remove();
2176                 }
2177         }
2178 }
2179
2180 /**
2181  * Process this slot, entry by entry->  Also update the latest message sent by slot
2182  */
2183 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2184
2185         // Update the last message seen
2186         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2187
2188         // Process each entry in the slot
2189         for (Entry *entry : slot->getEntries()) {
2190                 switch (entry->getType()) {
2191                 case TypeCommitPart:
2192                         processEntry((CommitPart *)entry);
2193                         break;
2194                 case TypeAbort:
2195                         processEntry((Abort *)entry);
2196                         break;
2197                 case TypeTransactionPart:
2198                         processEntry((TransactionPart *)entry);
2199                         break;
2200                 case TypeNewKey:
2201                         processEntry((NewKey *)entry);
2202                         break;
2203                 case TypeLastMessage:
2204                         processEntry((LastMessage *)entry, machineSet);
2205                         break;
2206                 case TypeRejectedMessage:
2207                         processEntry((RejectedMessage *)entry, indexer);
2208                         break;
2209                 case TypeTableStatus:
2210                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2211                         break;
2212                 default:
2213                         throw new Error("Unrecognized type: " + entry->getType());
2214                 }
2215         }
2216 }
2217
2218 /**
2219  * Update the last message that was sent for a machine Id
2220  */
2221 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2222         // Update what the last message received by a machine was
2223         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2224 }
2225
2226 /**
2227  * Add the new key to the arbitrators table and update the set of live
2228  * new keys (in case of a rescued new key message)
2229  */
2230 void Table::processEntry(NewKey *entry) {
2231         // Update the arbitrator table with the new key information
2232         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2233
2234         // Update what the latest live new key is
2235         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2236         if (oldNewKey != NULL) {
2237                 // Delete the old new key messages
2238                 oldNewKey->setDead();
2239         }
2240 }
2241
2242 /**
2243  * Process new table status entries and set dead the old ones as new
2244  * ones come in-> keeps track of the largest and smallest table status
2245  * seen in this current round of updating the local copy of the block
2246  * chain
2247  */
2248 void Table::processEntry(TableStatus entry, int64_t seq) {
2249         int newNumSlots = entry->getMaxSlots();
2250         updateCurrMaxSize(newNumSlots);
2251         initExpectedSize(seq, newNumSlots);
2252
2253         if (liveTableStatus != NULL) {
2254                 // We have a larger table status so the old table status is no
2255                 // int64_ter alive
2256                 liveTableStatus->setDead();
2257         }
2258
2259         // Make this new table status the latest alive table status
2260         liveTableStatus = entry;
2261 }
2262
2263 /**
2264  * Check old messages to see if there is a block chain violation->
2265  * Also
2266  */
2267 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2268         int64_t oldSeqNum = entry->getOldSeqNum();
2269         int64_t newSeqNum = entry->getNewSeqNum();
2270         bool isequal = entry->getEqual();
2271         int64_t machineId = entry->getMachineID();
2272         int64_t seq = entry->getSequenceNumber();
2273
2274         // Check if we have messages that were supposed to be rejected in
2275         // our local block chain
2276         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2277                 // Get the slot
2278                 Slot *slot = indexer->getSlot(seqNum);
2279
2280                 if (slot != NULL) {
2281                         // If we have this slot make sure that it was not supposed to be
2282                         // a rejected slot
2283                         int64_t slotMachineId = slot->getMachineID();
2284                         if (isequal != (slotMachineId == machineId)) {
2285                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2286                         }
2287                 }
2288         }
2289
2290         // Create a list of clients to watch until they see this rejected
2291         // message entry->
2292         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2293         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2294                 // Machine ID for the last message entry
2295                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2296
2297                 // We've seen it, don't need to continue to watch->  Our next
2298                 // message will implicitly acknowledge it->
2299                 if (lastMessageEntryMachineId == localMachineId) {
2300                         continue;
2301                 }
2302
2303                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2304                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2305
2306                 if (entrySequenceNumber < seq) {
2307                         // Add this rejected message to the set of messages that this
2308                         // machine ID did not see yet
2309                         addWatchVector(lastMessageEntryMachineId, entry);
2310                         // This client did not see this rejected message yet so add it
2311                         // to the watch set to monitor
2312                         deviceWatchSet->add(lastMessageEntryMachineId);
2313                 }
2314         }
2315         if (deviceWatchSet->isEmpty()) {
2316                 // This rejected message has been seen by all the clients so
2317                 entry->setDead();
2318         } else {
2319                 // We need to watch this rejected message
2320                 entry->setWatchSet(deviceWatchSet);
2321         }
2322 }
2323
2324 /**
2325  * Check if this abort is live, if not then save it so we can kill it
2326  * later-> update the last transaction number that was arbitrated on->
2327  */
2328 void Table::processEntry(Abort *entry) {
2329         if (entry->getTransactionSequenceNumber() != -1) {
2330                 // update the transaction status if it was sent to the server
2331                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2332                 if (status != NULL) {
2333                         status->setStatus(TransactionStatus_StatusAborted);
2334                 }
2335         }
2336
2337         // Abort has not been seen by the client it is for yet so we need to
2338         // keep track of it
2339         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2340         if (previouslySeenAbort != NULL) {
2341                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2342         }
2343
2344         if (entry->getTransactionArbitrator() == localMachineId) {
2345                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2346         }
2347
2348         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2349                 // The machine already saw this so it is dead
2350                 entry->setDead();
2351                 liveAbortTable->remove(entry->getAbortId());
2352
2353                 if (entry->getTransactionArbitrator() == localMachineId) {
2354                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2355                 }
2356                 return;
2357         }
2358
2359         // Update the last arbitration data that we have seen so far
2360         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2361                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2362                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2363                         // Is larger
2364                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2365                 }
2366         } else {
2367                 // Never seen any data from this arbitrator so record the first one
2368                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2369         }
2370
2371         // Set dead a transaction if we can
2372         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2373         if (transactionToSetDead != NULL) {
2374                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2375         }
2376
2377         // Update the last transaction sequence number that the arbitrator
2378         // arbitrated on
2379         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2380         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2381                 // Is a valid one
2382                 if (entry->getTransactionSequenceNumber() != -1) {
2383                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2384                 }
2385         }
2386 }
2387
2388 /**
2389  * Set dead the transaction part if that transaction is dead and keep
2390  * track of all new parts
2391  */
2392 void Table::processEntry(TransactionPart *entry) {
2393         // Check if we have already seen this transaction and set it dead OR
2394         // if it is not alive
2395         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2396         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2397                 // This transaction is dead, it was already committed or aborted
2398                 entry->setDead();
2399                 return;
2400         }
2401
2402         // This part is still alive
2403         Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2404
2405         if (transactionPart == NULL) {
2406                 // Dont have a table for this machine Id yet so make one
2407                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2408                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2409         }
2410
2411         // Update the part and set dead ones we have already seen (got a
2412         // rescued version)
2413         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2414         if (previouslySeenPart != NULL) {
2415                 previouslySeenPart->setDead();
2416         }
2417 }
2418
2419 /**
2420  * Process new commit entries and save them for future use->  Delete duplicates
2421  */
2422 void Table::processEntry(CommitPart *entry) {
2423         // Update the last transaction that was updated if we can
2424         if (entry->getTransactionSequenceNumber() != -1) {
2425                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2426                 // Update the last transaction sequence number that the arbitrator
2427                 // arbitrated on
2428                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2429                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2430                 }
2431         }
2432
2433         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2434         if (commitPart == NULL) {
2435                 // Don't have a table for this machine Id yet so make one
2436                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2437                 newCommitParts->put(entry->getMachineId(), commitPart);
2438         }
2439         // Update the part and set dead ones we have already seen (got a
2440         // rescued version)
2441         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2442         if (previouslySeenPart != NULL) {
2443                 previouslySeenPart->setDead();
2444         }
2445 }
2446
2447 /**
2448  * Update the last message seen table-> Update and set dead the
2449  * appropriate RejectedMessages as clients see them-> Updates the live
2450  * aborts, removes those that are dead and sets them dead-> Check that
2451  * the last message seen is correct and that there is no mismatch of
2452  * our own last message or that other clients have not had a rollback
2453  * on the last message->
2454  */
2455 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2456         // We have seen this machine ID
2457         machineSet->remove(machineId);
2458
2459         // Get the set of rejected messages that this machine Id is has not seen yet
2460         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2461         // If there is a rejected message that this machine Id has not seen yet
2462         if (watchset != NULL) {
2463                 // Go through each rejected message that this machine Id has not
2464                 // seen yet
2465                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2466                         RejectedMessage *rm = rmit->next();
2467                         // If this machine Id has seen this rejected message->->->
2468                         if (rm->getSequenceNumber() <= seqNum) {
2469                                 // Remove it from our watchlist
2470                                 rmit->remove();
2471                                 // Decrement machines that need to see this notification
2472                                 rm->removeWatcher(machineId);
2473                         }
2474                 }
2475         }
2476
2477         // Set dead the abort
2478         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2479                 Abort *abort = i->next()->getValue();
2480                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2481                         abort->setDead();
2482                         i->remove();
2483                         if (abort->getTransactionArbitrator() == localMachineId) {
2484                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2485                         }
2486                 }
2487         }
2488         if (machineId == localMachineId) {
2489                 // Our own messages are immediately dead->
2490                 if (liveness instanceof LastMessage) {
2491                         ((LastMessage *)liveness)->setDead();
2492                 } else if (liveness instanceof Slot) {
2493                         ((Slot *)liveness)->setDead();
2494                 } else {
2495                         throw new Error("Unrecognized type");
2496                 }
2497         }
2498         // Get the old last message for this device
2499         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2500         if (lastMessageEntry == NULL) {
2501                 // If no last message then there is nothing else to process
2502                 return;
2503         }
2504
2505         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2506         Liveness *lastEntry = lastMessageEntry->getSecond();
2507
2508         // If it is not our machine Id since we already set ours to dead
2509         if (machineId != localMachineId) {
2510                 if (lastEntry instanceof LastMessage) {
2511                         ((LastMessage *)lastEntry)->setDead();
2512                 } else if (lastEntry instanceof Slot) {
2513                         ((Slot *)lastEntry)->setDead();
2514                 } else {
2515                         throw new Error("Unrecognized type");
2516                 }
2517         }
2518         // Make sure the server is not playing any games
2519         if (machineId == localMachineId) {
2520                 if (hadPartialSendToServer) {
2521                         // We were not making any updates and we had a machine mismatch
2522                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2523                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2524                         }
2525                 } else {
2526                         // We were not making any updates and we had a machine mismatch
2527                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2528                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2529                         }
2530                 }
2531         } else {
2532                 if (lastMessageSeqNum > seqNum) {
2533                         throw new Error("Server Error: Rollback on remote machine sequence number");
2534                 }
2535         }
2536 }
2537
2538 /**
2539  * Add a rejected message entry to the watch set to keep track of
2540  * which clients have seen that rejected message entry and which have
2541  * not.
2542  */
2543 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2544         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2545         if (entries == NULL) {
2546                 // There is no set for this machine ID yet so create one
2547                 entries = new Hashset<RejectedMessage *>();
2548                 rejectedMessageWatchVectorTable->put(machineId, entries);
2549         }
2550         entries->add(entry);
2551 }
2552
2553 /**
2554  * Check if the HMAC chain is not violated
2555  */
2556 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2557         for (int i = 0; i < newSlots->length(); i++) {
2558                 Slot *currSlot = newSlots->get(i);
2559                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2560                 if (prevSlot != NULL &&
2561                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2562                         throw new Error("Server Error: Invalid HMAC Chain");
2563         }
2564 }