edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14
15
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
17         buffer(NULL),
18         cloud(new CloudComm(this, baseurl, password, listeningPort)),
19         random(NULL),
20         liveTableStatus(NULL),
21         pendingTransactionBuilder(NULL),
22         lastPendingTransactionSpeculatedOn(NULL),
23         firstPendingTransaction(NULL),
24         numberOfSlots(0),
25         bufferResizeThreshold(0),
26         liveSlotCount(0),
27         oldestLiveSlotSequenceNumver(1),
28         localMachineId(_localMachineId),
29         sequenceNumber(0),
30         localTransactionSequenceNumber(0),
31         lastTransactionSequenceNumberSpeculatedOn(0),
32         oldestTransactionSequenceNumberSpeculatedOn(0),
33         localArbitrationSequenceNumber(0),
34         hadPartialSendToServer(false),
35         attemptedToSendToServer(false),
36         expectedsize(0),
37         didFindTableStatus(false),
38         currMaxSize(0),
39         lastSlotAttemptedToSend(NULL),
40         lastIsNewKey(false),
41         lastNewSize(0),
42         lastTransactionPartsSent(NULL),
43         lastPendingSendArbitrationEntriesToDelete(NULL),
44         lastNewKey(NULL),
45         committedKeyValueTable(NULL),
46         speculatedKeyValueTable(NULL),
47         pendingTransactionSpeculatedKeyValueTable(NULL),
48         liveNewKeyTable(NULL),
49         lastMessageTable(NULL),
50         rejectedMessageWatchVectorTable(NULL),
51         arbitratorTable(NULL),
52         liveAbortTable(NULL),
53         newTransactionParts(NULL),
54         newCommitParts(NULL),
55         lastArbitratedTransactionNumberByArbitratorTable(NULL),
56         liveTransactionBySequenceNumberTable(NULL),
57         liveTransactionByTransactionIdTable(NULL),
58         liveCommitsTable(NULL),
59         liveCommitsByKeyTable(NULL),
60         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61         rejectedSlotVector(NULL),
62         pendingTransactionQueue(NULL),
63         pendingSendArbitrationRounds(NULL),
64         pendingSendArbitrationEntriesToDelete(NULL),
65         transactionPartsSent(NULL),
66         outstandingTransactionStatus(NULL),
67         liveAbortsGeneratedByLocal(NULL),
68         offlineTransactionsCommittedAndAtServer(NULL),
69         localCommunicationTable(NULL),
70         lastTransactionSeenFromMachineFromServer(NULL),
71         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72         lastInsertedNewKey(false),
73         lastSeqNumArbOn(0)
74 {
75         init();
76 }
77
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
79         buffer(NULL),
80         cloud(_cloud),
81         random(NULL),
82         liveTableStatus(NULL),
83         pendingTransactionBuilder(NULL),
84         lastPendingTransactionSpeculatedOn(NULL),
85         firstPendingTransaction(NULL),
86         numberOfSlots(0),
87         bufferResizeThreshold(0),
88         liveSlotCount(0),
89         oldestLiveSlotSequenceNumver(1),
90         localMachineId(_localMachineId),
91         sequenceNumber(0),
92         localTransactionSequenceNumber(0),
93         lastTransactionSequenceNumberSpeculatedOn(0),
94         oldestTransactionSequenceNumberSpeculatedOn(0),
95         localArbitrationSequenceNumber(0),
96         hadPartialSendToServer(false),
97         attemptedToSendToServer(false),
98         expectedsize(0),
99         didFindTableStatus(false),
100         currMaxSize(0),
101         lastSlotAttemptedToSend(NULL),
102         lastIsNewKey(false),
103         lastNewSize(0),
104         lastTransactionPartsSent(NULL),
105         lastPendingSendArbitrationEntriesToDelete(NULL),
106         lastNewKey(NULL),
107         committedKeyValueTable(NULL),
108         speculatedKeyValueTable(NULL),
109         pendingTransactionSpeculatedKeyValueTable(NULL),
110         liveNewKeyTable(NULL),
111         lastMessageTable(NULL),
112         rejectedMessageWatchVectorTable(NULL),
113         arbitratorTable(NULL),
114         liveAbortTable(NULL),
115         newTransactionParts(NULL),
116         newCommitParts(NULL),
117         lastArbitratedTransactionNumberByArbitratorTable(NULL),
118         liveTransactionBySequenceNumberTable(NULL),
119         liveTransactionByTransactionIdTable(NULL),
120         liveCommitsTable(NULL),
121         liveCommitsByKeyTable(NULL),
122         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123         rejectedSlotVector(NULL),
124         pendingTransactionQueue(NULL),
125         pendingSendArbitrationRounds(NULL),
126         pendingSendArbitrationEntriesToDelete(NULL),
127         transactionPartsSent(NULL),
128         outstandingTransactionStatus(NULL),
129         liveAbortsGeneratedByLocal(NULL),
130         offlineTransactionsCommittedAndAtServer(NULL),
131         localCommunicationTable(NULL),
132         lastTransactionSeenFromMachineFromServer(NULL),
133         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134         lastInsertedNewKey(false),
135         lastSeqNumArbOn(0)
136 {
137         init();
138 }
139
140 /**
141  * Init all the stuff needed for for table usage
142  */
143 void Table::init() {
144         // Init helper objects
145         random = new Random();
146         buffer = new SlotBuffer();
147
148         // init data structs
149         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155         arbitratorTable = new Hashtable<IoTString *, int64_t>();
156         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165         rejectedSlotVector = new Vector<int64_t>();
166         pendingTransactionQueue = new Vector<Transaction *>();
167         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
176
177
178         // Other init stuff
179         numberOfSlots = buffer->capacity();
180         setResizeThreshold();
181 }
182
183 /**
184  * Initialize the table by inserting a table status as the first entry
185  * into the table status also initialize the crypto stuff.
186  */
187 void Table::initTable() {
188         cloud->initSecurity();
189
190         // Create the first insertion into the block chain which is the table status
191         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192         localSequenceNumber++;
193         TableStatus *status = new TableStatus(s, numberOfSlots);
194         s->addEntry(status);
195         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
196
197         if (array == NULL) {
198                 array = new Array<Slot *>(1);
199                 array->set(0, s);
200                 // update local block chain
201                 validateAndUpdate(array, true);
202         } else if (array->length() == 1) {
203                 // in case we did push the slot BUT we failed to init it
204                 validateAndUpdate(array, true);
205         } else {
206                 throw new Error("Error on initialization");
207         }
208 }
209
210 /**
211  * Rebuild the table from scratch by pulling the latest block chain
212  * from the server.
213  */
214 void Table::rebuild() {
215         // Just pull the latest slots from the server
216         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217         validateAndUpdate(newslots, true);
218         sendToServer(NULL);
219         updateLiveTransactionsAndStatus();
220 }
221
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
224 }
225
226 int64_t Table::getArbitrator(IoTString *key) {
227         return arbitratorTable->get(key);
228 }
229
230 void Table::close() {
231         cloud->close();
232 }
233
234 IoTString *Table::getCommitted(IoTString *key)  {
235         KeyValue *kv = committedKeyValueTable->get(key);
236
237         if (kv != NULL) {
238                 return kv->getValue();
239         } else {
240                 return NULL;
241         }
242 }
243
244 IoTString *Table::getSpeculative(IoTString *key) {
245         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
246
247         if (kv == NULL) {
248                 kv = speculatedKeyValueTable->get(key);
249         }
250
251         if (kv == NULL) {
252                 kv = committedKeyValueTable->get(key);
253         }
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263         KeyValue *kv = committedKeyValueTable->get(key);
264
265         if (!arbitratorTable->contains(key)) {
266                 throw new Error("Key not Found.");
267         }
268
269         // Make sure new key value pair matches the current arbitrator
270         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271                 // TODO: Maybe not throw en error
272                 throw new Error("Not all Key Values Match Arbitrator.");
273         }
274
275         if (kv != NULL) {
276                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277                 return kv->getValue();
278         } else {
279                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
280                 return NULL;
281         }
282 }
283
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285         if (!arbitratorTable->contains(key)) {
286                 throw new Error("Key not Found.");
287         }
288
289         // Make sure new key value pair matches the current arbitrator
290         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291                 // TODO: Maybe not throw en error
292                 throw new Error("Not all Key Values Match Arbitrator.");
293         }
294
295         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
296
297         if (kv == NULL) {
298                 kv = speculatedKeyValueTable->get(key);
299         }
300
301         if (kv == NULL) {
302                 kv = committedKeyValueTable->get(key);
303         }
304
305         if (kv != NULL) {
306                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307                 return kv->getValue();
308         } else {
309                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
310                 return NULL;
311         }
312 }
313
314 bool Table::update()  {
315         try {
316                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317                 validateAndUpdate(newSlots, false);
318                 sendToServer(NULL);
319
320
321                 updateLiveTransactionsAndStatus();
322
323                 return true;
324         } catch (Exception *e) {
325                 for (int64_t m : localCommunicationTable->keySet()) {
326                         updateFromLocal(m);
327                 }
328         }
329
330         return false;
331 }
332
333 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
334         while (true) {
335                 if (!arbitratorTable->contains(keyName)) {
336                         // There is already an arbitrator
337                         return false;
338                 }
339
340                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
341
342                 if (sendToServer(newKey)) {
343                         // If successfully inserted
344                         return true;
345                 }
346         }
347 }
348
349 void Table::startTransaction() {
350         // Create a new transaction, invalidates any old pending transactions.
351         pendingTransactionBuilder = new PendingTransaction(localMachineId);
352 }
353
354 void Table::addKV(IoTString *key, IoTString *value) {
355
356         // Make sure it is a valid key
357         if (!arbitratorTable->contains(key)) {
358                 throw new Error("Key not Found.");
359         }
360
361         // Make sure new key value pair matches the current arbitrator
362         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
363                 // TODO: Maybe not throw en error
364                 throw new Error("Not all Key Values Match Arbitrator.");
365         }
366
367         // Add the key value to this transaction
368         KeyValue *kv = new KeyValue(key, value);
369         pendingTransactionBuilder->addKV(kv);
370 }
371
372 TransactionStatus *Table::commitTransaction() {
373
374         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375                 // transaction with no updates will have no effect on the system
376                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
377         }
378
379         // Set the local transaction sequence number and increment
380         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381         localTransactionSequenceNumber++;
382
383         // Create the transaction status
384         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
385
386         // Create the new transaction
387         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388         newTransaction->setTransactionStatus(transactionStatus);
389
390         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391                 // Add it to the queue and invalidate the builder for safety
392                 pendingTransactionQueue->add(newTransaction);
393         } else {
394                 arbitrateOnLocalTransaction(newTransaction);
395                 updateLiveStateFromLocal();
396         }
397
398         pendingTransactionBuilder = new PendingTransaction(localMachineId);
399
400         try {
401                 sendToServer(NULL);
402         } catch (ServerException *e) {
403
404                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405                 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
406                         Transaction *transaction = iter->next();
407
408                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409                                 // Already contacted this client so ignore all attempts to contact this client
410                                 // to preserve ordering for arbitrator
411                                 continue;
412                         }
413
414                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
415
416                         if (sendReturn->getFirst()) {
417                                 // Failed to contact over local
418                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
419                         } else {
420                                 // Successful contact or should not contact
421
422                                 if (sendReturn->getSecond()) {
423                                         // did arbitrate
424                                         iter->remove();
425                                 }
426                         }
427                 }
428         }
429
430         updateLiveStateFromLocal();
431
432         return transactionStatus;
433 }
434
435 /**
436  * Recalculate the new resize threshold
437  */
438 void Table::setResizeThreshold() {
439         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
440         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
441 }
442
443 int64_t Table::getLocalSequenceNumber() {
444         return localSequenceNumber;
445 }
446
447
448 bool lastInsertedNewKey = false;
449
450 bool Table::sendToServer(NewKey *newKey) {
451
452         bool fromRetry = false;
453
454         try {
455                 if (hadPartialSendToServer) {
456                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
457                         if (newSlots->length() == 0) {
458                                 fromRetry = true;
459                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
460
461                                 if (sendSlotsReturn->getFirst()) {
462                                         if (newKey != NULL) {
463                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
464                                                         newKey = NULL;
465                                                 }
466                                         }
467
468                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
469                                                 transaction->resetServerFailure();
470
471                                                 // Update which transactions parts still need to be sent
472                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
473
474                                                 // Add the transaction status to the outstanding list
475                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
476
477                                                 // Update the transaction status
478                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
479
480                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
481                                                 if (transaction->didSendAllParts()) {
482                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483                                                         pendingTransactionQueue->remove(transaction);
484                                                 }
485                                         }
486                                 } else {
487
488                                         newSlots = sendSlotsReturn->getThird();
489
490                                         bool isInserted = false;
491                                         for (uint si = 0; si < newSlots->length(); si++) {
492                                                 Slot *s = newSlots->get(si);
493                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
494                                                         isInserted = true;
495                                                         break;
496                                                 }
497                                         }
498
499                                         for (uint si = 0; si < newSlots->length(); si++) {
500                                                 Slot *s = newSlots->get(si);
501                                                 if (isInserted) {
502                                                         break;
503                                                 }
504
505                                                 // Process each entry in the slot
506                                                 for (Entry *entry : s->getEntries()) {
507                                                         if (entry->getType() == TypeLastMessage) {
508                                                                 LastMessage *lastMessage = (LastMessage *)entry;
509                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
510                                                                         isInserted = true;
511                                                                         break;
512                                                                 }
513                                                         }
514                                                 }
515                                         }
516
517                                         if (isInserted) {
518                                                 if (newKey != NULL) {
519                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
520                                                                 newKey = NULL;
521                                                         }
522                                                 }
523
524                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
525                                                         transaction->resetServerFailure();
526
527                                                         // Update which transactions parts still need to be sent
528                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
529
530                                                         // Add the transaction status to the outstanding list
531                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
532
533                                                         // Update the transaction status
534                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
535
536                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
537                                                         if (transaction->didSendAllParts()) {
538                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
539                                                                 pendingTransactionQueue->remove(transaction);
540                                                         } else {
541                                                                 transaction->resetServerFailure();
542                                                                 // Set the transaction sequence number back to nothing
543                                                                 if (!transaction->didSendAPartToServer()) {
544                                                                         transaction->setSequenceNumber(-1);
545                                                                 }
546                                                         }
547                                                 }
548                                         }
549                                 }
550
551                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
552                                         transaction->resetServerFailure();
553                                         // Set the transaction sequence number back to nothing
554                                         if (!transaction->didSendAPartToServer()) {
555                                                 transaction->setSequenceNumber(-1);
556                                         }
557                                 }
558
559                                 if (sendSlotsReturn->getThird()->length() != 0) {
560                                         // insert into the local block chain
561                                         validateAndUpdate(sendSlotsReturn->getThird(), true);
562                                 }
563                                 // continue;
564                         } else {
565                                 bool isInserted = false;
566                                 for (uint si = 0; si < newSlots->length(); si++) {
567                                         Slot *s = newSlots->get(si);
568                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
569                                                 isInserted = true;
570                                                 break;
571                                         }
572                                 }
573
574                                 for (uint si = 0; si < newSlots->length(); si++) {
575                                         Slot *s = newSlots->get(si);
576                                         if (isInserted) {
577                                                 break;
578                                         }
579
580                                         // Process each entry in the slot
581                                         for (Entry *entry : s->getEntries()) {
582
583                                                 if (entry->getType() == TypeLastMessage) {
584                                                         LastMessage *lastMessage = (LastMessage *)entry;
585                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
586                                                                 isInserted = true;
587                                                                 break;
588                                                         }
589                                                 }
590                                         }
591                                 }
592
593                                 if (isInserted) {
594                                         if (newKey != NULL) {
595                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
596                                                         newKey = NULL;
597                                                 }
598                                         }
599
600                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
601                                                 transaction->resetServerFailure();
602
603                                                 // Update which transactions parts still need to be sent
604                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
605
606                                                 // Add the transaction status to the outstanding list
607                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
608
609                                                 // Update the transaction status
610                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
611
612                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
613                                                 if (transaction->didSendAllParts()) {
614                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
615                                                         pendingTransactionQueue->remove(transaction);
616                                                 } else {
617                                                         transaction->resetServerFailure();
618                                                         // Set the transaction sequence number back to nothing
619                                                         if (!transaction->didSendAPartToServer()) {
620                                                                 transaction->setSequenceNumber(-1);
621                                                         }
622                                                 }
623                                         }
624                                 } else {
625                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
626                                                 transaction->resetServerFailure();
627                                                 // Set the transaction sequence number back to nothing
628                                                 if (!transaction->didSendAPartToServer()) {
629                                                         transaction->setSequenceNumber(-1);
630                                                 }
631                                         }
632                                 }
633
634                                 // insert into the local block chain
635                                 validateAndUpdate(newSlots, true);
636                         }
637                 }
638         } catch (ServerException *e) {
639                 throw e;
640         }
641
642
643
644         try {
645                 // While we have stuff that needs inserting into the block chain
646                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
647
648                         fromRetry = false;
649
650                         if (hadPartialSendToServer) {
651                                 throw new Error("Should Be error free");
652                         }
653
654
655
656                         // If there is a new key with same name then end
657                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
658                                 return false;
659                         }
660
661                         // Create the slot
662                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
663                         localSequenceNumber++;
664
665                         // Try to fill the slot with data
666                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
667                         bool needsResize = fillSlotsReturn->getFirst();
668                         int newSize = fillSlotsReturn->getSecond();
669                         bool insertedNewKey = fillSlotsReturn->getThird();
670
671                         if (needsResize) {
672                                 // Reset which transaction to send
673                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
674                                         transaction->resetNextPartToSend();
675
676                                         // Set the transaction sequence number back to nothing
677                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
678                                                 transaction->setSequenceNumber(-1);
679                                         }
680                                 }
681
682                                 // Clear the sent data since we are trying again
683                                 pendingSendArbitrationEntriesToDelete->clear();
684                                 transactionPartsSent->clear();
685
686                                 // We needed a resize so try again
687                                 fillSlot(slot, true, newKey);
688                         }
689
690                         lastSlotAttemptedToSend = slot;
691                         lastIsNewKey = (newKey != NULL);
692                         lastInsertedNewKey = insertedNewKey;
693                         lastNewSize = newSize;
694                         lastNewKey = newKey;
695                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
696                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
697
698
699                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
700
701                         if (sendSlotsReturn->getFirst()) {
702
703                                 // Did insert into the block chain
704
705                                 if (insertedNewKey) {
706                                         // This slot was what was inserted not a previous slot
707
708                                         // New Key was successfully inserted into the block chain so dont want to insert it again
709                                         newKey = NULL;
710                                 }
711
712                                 // Remove the aborts and commit parts that were sent from the pending to send queue
713                                 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
714                                         ArbitrationRound *round = iter->next();
715                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
716
717                                         if (round->isDoneSending()) {
718                                                 // Sent all the parts
719                                                 iter->remove();
720                                         }
721                                 }
722
723                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
724                                         transaction->resetServerFailure();
725
726                                         // Update which transactions parts still need to be sent
727                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
728
729                                         // Add the transaction status to the outstanding list
730                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
731
732                                         // Update the transaction status
733                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
734
735                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
736                                         if (transaction->didSendAllParts()) {
737                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
738                                                 pendingTransactionQueue->remove(transaction);
739                                         }
740                                 }
741                         } else {
742                                 // Reset which transaction to send
743                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
744                                         transaction->resetNextPartToSend();
745
746                                         // Set the transaction sequence number back to nothing
747                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
748                                                 transaction->setSequenceNumber(-1);
749                                         }
750                                 }
751                         }
752
753                         // Clear the sent data in preparation for next send
754                         pendingSendArbitrationEntriesToDelete->clear();
755                         transactionPartsSent->clear();
756
757                         if (sendSlotsReturn->getThird()->length() != 0) {
758                                 // insert into the local block chain
759                                 validateAndUpdate(sendSlotsReturn->getThird(), true);
760                         }
761                 }
762
763         } catch (ServerException *e) {
764
765                 if (e->getType() != ServerException->TypeInputTimeout) {
766                         // Nothing was able to be sent to the server so just clear these data structures
767                         for (Transaction *transaction : transactionPartsSent->keySet()) {
768                                 transaction->resetNextPartToSend();
769
770                                 // Set the transaction sequence number back to nothing
771                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772                                         transaction->setSequenceNumber(-1);
773                                 }
774                         }
775                 } else {
776                         // There was a partial send to the server
777                         hadPartialSendToServer = true;
778
779                         // Nothing was able to be sent to the server so just clear these data structures
780                         for (Transaction *transaction : transactionPartsSent->keySet()) {
781                                 transaction->resetNextPartToSend();
782                                 transaction->setServerFailure();
783                         }
784                 }
785
786                 pendingSendArbitrationEntriesToDelete->clear();
787                 transactionPartsSent->clear();
788
789                 throw e;
790         }
791
792         return newKey == NULL;
793 }
794
795 bool Table::updateFromLocal(int64_t machineId) {
796         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
797         if (localCommunicationInformation == NULL) {
798                 // Cant talk to that device locally so do nothing
799                 return false;
800         }
801
802         // Get the size of the send data
803         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
804
805         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
806         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
807                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
808         }
809
810         Array<char> *sendData = new Array<char>(sendDataSize);
811         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
812
813         // Encode the data
814         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
815         bbEncode->putInt(0);
816
817         // Send by local
818         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
819         localSequenceNumber++;
820
821         if (returnData == NULL) {
822                 // Could not contact server
823                 return false;
824         }
825
826         // Decode the data
827         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
828         int numberOfEntries = bbDecode->getInt();
829
830         for (int i = 0; i < numberOfEntries; i++) {
831                 char type = bbDecode->get();
832                 if (type == TypeAbort) {
833                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
834                         processEntry(abort);
835                 } else if (type == TypeCommitPart) {
836                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
837                         processEntry(commitPart);
838                 }
839         }
840
841         updateLiveStateFromLocal();
842
843         return true;
844 }
845
846 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
847
848         // Get the devices local communications
849         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
850
851         if (localCommunicationInformation == NULL) {
852                 // Cant talk to that device locally so do nothing
853                 return Pair<bool, bool>(true, false);
854         }
855
856         // Get the size of the send data
857         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
858         for (TransactionPart *part : transaction->getParts()->values()) {
859                 sendDataSize += part->getSize();
860         }
861
862         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
863         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
864                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
865         }
866
867         // Make the send data size
868         Array<char> *sendData = new Array<char>(sendDataSize);
869         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
870
871         // Encode the data
872         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
873         bbEncode->putInt(transaction->getParts()->size());
874         for (TransactionPart *part : transaction->getParts()->values()) {
875                 part->encode(bbEncode);
876         }
877
878
879         // Send by local
880         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
881         localSequenceNumber++;
882
883         if (returnData == NULL) {
884                 // Could not contact server
885                 return Pair<bool, bool>(true, false);
886         }
887
888         // Decode the data
889         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
890         bool didCommit = bbDecode->get() == 1;
891         bool couldArbitrate = bbDecode->get() == 1;
892         int numberOfEntries = bbDecode->getInt();
893         bool foundAbort = false;
894
895         for (int i = 0; i < numberOfEntries; i++) {
896                 char type = bbDecode->get();
897                 if (type == TypeAbort) {
898                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
899
900                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
901                                 foundAbort = true;
902                         }
903
904                         processEntry(abort);
905                 } else if (type == TypeCommitPart) {
906                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
907                         processEntry(commitPart);
908                 }
909         }
910
911         updateLiveStateFromLocal();
912
913         if (couldArbitrate) {
914                 TransactionStatus status =  transaction->getTransactionStatus();
915                 if (didCommit) {
916                         status->setStatus(TransactionStatus_StatusCommitted);
917                 } else {
918                         status->setStatus(TransactionStatus_StatusAborted);
919                 }
920         } else {
921                 TransactionStatus status =  transaction->getTransactionStatus();
922                 if (foundAbort) {
923                         status->setStatus(TransactionStatus_StatusAborted);
924                 } else {
925                         status->setStatus(TransactionStatus_StatusCommitted);
926                 }
927         }
928
929         return Pair<bool, bool>(false, true);
930 }
931
932 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
933
934         // Decode the data
935         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
936         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
937         int numberOfParts = bbDecode->getInt();
938
939         // If we did commit a transaction or not
940         bool didCommit = false;
941         bool couldArbitrate = false;
942
943         if (numberOfParts != 0) {
944
945                 // decode the transaction
946                 Transaction *transaction = new Transaction();
947                 for (int i = 0; i < numberOfParts; i++) {
948                         bbDecode->get();
949                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
950                         transaction->addPartDecode(newPart);
951                 }
952
953                 // Arbitrate on transaction and pull relevant return data
954                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
955                 couldArbitrate = localArbitrateReturn->getFirst();
956                 didCommit = localArbitrateReturn->getSecond();
957
958                 updateLiveStateFromLocal();
959
960                 // Transaction was sent to the server so keep track of it to prevent double commit
961                 if (transaction->getSequenceNumber() != -1) {
962                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
963                 }
964         }
965
966         // The data to send back
967         int returnDataSize = 0;
968         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
969
970         // Get the aborts to send back
971         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
972         Collections->sort(abortLocalSequenceNumbers);
973         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
974                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
975                         continue;
976                 }
977
978                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
979                 unseenArbitrations->add(abort);
980                 returnDataSize += abort->getSize();
981         }
982
983         // Get the commits to send back
984         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
985         if (commitForClientTable != NULL) {
986                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
987                 Collections->sort(commitLocalSequenceNumbers);
988
989                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
990                         Commit *commit = commitForClientTable->get(localSequenceNumber);
991
992                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
993                                 continue;
994                         }
995
996                         unseenArbitrations->addAll(commit->getParts()->values());
997
998                         for (CommitPart commitPart : commit->getParts()->values()) {
999                                 returnDataSize += commitPart->getSize();
1000                         }
1001                 }
1002         }
1003
1004         // Number of arbitration entries to decode
1005         returnDataSize += 2 * sizeof(int32_t);
1006
1007         // bool of did commit or not
1008         if (numberOfParts != 0) {
1009                 returnDataSize += sizeof(char);
1010         }
1011
1012         // Data to send Back
1013         Array<char> *returnData = new Array<char>(returnDataSize);
1014         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1015
1016         if (numberOfParts != 0) {
1017                 if (didCommit) {
1018                         bbEncode->put((char)1);
1019                 } else {
1020                         bbEncode->put((char)0);
1021                 }
1022                 if (couldArbitrate) {
1023                         bbEncode->put((char)1);
1024                 } else {
1025                         bbEncode->put((char)0);
1026                 }
1027         }
1028
1029         bbEncode->putInt(unseenArbitrations->size());
1030         for (Entry *entry : unseenArbitrations) {
1031                 entry->encode(bbEncode);
1032         }
1033
1034
1035         localSequenceNumber++;
1036         return returnData;
1037 }
1038
1039 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1040         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1041         attemptedToSendToServer = true;
1042
1043         bool inserted = false;
1044         bool lastTryInserted = false;
1045
1046         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1047         if (array == NULL) {
1048                 array = new Array<Slot *>();
1049                 array->set(0, slot);
1050                 rejectedSlotVector->clear();
1051                 inserted = true;
1052         } else {
1053                 if (array->length() == 0) {
1054                         throw new Error("Server Error: Did not send any slots");
1055                 }
1056
1057                 // if (attemptedToSendToServerTmp) {
1058                 if (hadPartialSendToServer) {
1059
1060                         bool isInserted = false;
1061                         for (Slot *s : array) {
1062                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1063                                         isInserted = true;
1064                                         break;
1065                                 }
1066                         }
1067
1068                         for (Slot *s : array) {
1069                                 if (isInserted) {
1070                                         break;
1071                                 }
1072
1073                                 // Process each entry in the slot
1074                                 for (Entry *entry : s->getEntries()) {
1075
1076                                         if (entry->getType() == TypeLastMessage) {
1077                                                 LastMessage *lastMessage = (LastMessage *)entry;
1078
1079                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1080                                                         isInserted = true;
1081                                                         break;
1082                                                 }
1083                                         }
1084                                 }
1085                         }
1086
1087                         if (!isInserted) {
1088                                 rejectedSlotVector->add(slot->getSequenceNumber());
1089                                 lastTryInserted = false;
1090                         } else {
1091                                 lastTryInserted = true;
1092                         }
1093                 } else {
1094                         rejectedSlotVector->add(slot->getSequenceNumber());
1095                         lastTryInserted = false;
1096                 }
1097         }
1098
1099         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1100 }
1101
1102 /**
1103  * Returns false if a resize was needed
1104  */
1105 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1106         int newSize = 0;
1107         if (liveSlotCount > bufferResizeThreshold) {
1108                 resize = true;  //Resize is forced
1109         }
1110
1111         if (resize) {
1112                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1113                 TableStatus *status = new TableStatus(slot, newSize);
1114                 slot->addEntry(status);
1115         }
1116
1117         // Fill with rejected slots first before doing anything else
1118         doRejectedMessages(slot);
1119
1120         // Do mandatory rescue of entries
1121         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1122
1123         // Extract working variables
1124         bool needsResize = mandatoryRescueReturn->getFirst();
1125         bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1126         int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1127
1128         if (needsResize && !resize) {
1129                 // We need to resize but we are not resizing so return false
1130                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1131         }
1132
1133         bool inserted = false;
1134         if (newKeyEntry != NULL) {
1135                 newKeyEntry->setSlot(slot);
1136                 if (slot->hasSpace(newKeyEntry)) {
1137
1138                         slot->addEntry(newKeyEntry);
1139                         inserted = true;
1140                 }
1141         }
1142
1143         // Clear the transactions, aborts and commits that were sent previously
1144         transactionPartsSent->clear();
1145         pendingSendArbitrationEntriesToDelete->clear();
1146
1147         for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1148                 bool isFull = false;
1149                 round->generateParts();
1150                 Vector<Entry *> *parts = round->getParts();
1151
1152                 // Insert pending arbitration data
1153                 for (Entry *arbitrationData : parts) {
1154
1155                         // If it is an abort then we need to set some information
1156                         if (arbitrationData instanceof Abort) {
1157                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1158                         }
1159
1160                         if (!slot->hasSpace(arbitrationData)) {
1161                                 // No space so cant do anything else with these data entries
1162                                 isFull = true;
1163                                 break;
1164                         }
1165
1166                         // Add to this current slot and add it to entries to delete
1167                         slot->addEntry(arbitrationData);
1168                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1169                 }
1170
1171                 if (isFull) {
1172                         break;
1173                 }
1174         }
1175
1176         if (pendingTransactionQueue->size() > 0) {
1177                 Transaction *transaction = pendingTransactionQueue->get(0);
1178                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1179                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1180                         transaction->setSequenceNumber(slot->getSequenceNumber());
1181                 }
1182
1183                 while (true) {
1184                         TransactionPart *part = transaction->getNextPartToSend();
1185                         if (part == NULL) {
1186                                 // Ran out of parts to send for this transaction so move on
1187                                 break;
1188                         }
1189
1190                         if (slot->hasSpace(part)) {
1191                                 slot->addEntry(part);
1192                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1193                                 if (partsSent == NULL) {
1194                                         partsSent = new Vector<int32_t>();
1195                                         transactionPartsSent->put(transaction, partsSent);
1196                                 }
1197                                 partsSent->add(part->getPartNumber());
1198                                 transactionPartsSent->put(transaction, partsSent);
1199                         } else {
1200                                 break;
1201                         }
1202                 }
1203         }
1204
1205         // Fill the remainder of the slot with rescue data
1206         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1207
1208         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1209 }
1210
1211 void Table::doRejectedMessages(Slot *s) {
1212         if (!rejectedSlotVector->isEmpty()) {
1213                 /* TODO: We should avoid generating a rejected message entry if
1214                  * there is already a sufficient entry in the queue (e->g->,
1215                  * equalsto value of true and same sequence number)->  */
1216
1217                 int64_t old_seqn = rejectedSlotVector->firstElement();
1218                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1219                         int64_t new_seqn = rejectedSlotVector->lastElement();
1220                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1221                         s->addEntry(rm);
1222                 } else {
1223                         int64_t prev_seqn = -1;
1224                         int i = 0;
1225                         /* Go through list of missing messages */
1226                         for (; i < rejectedSlotVector->size(); i++) {
1227                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1228                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1229                                 if (s_msg != NULL)
1230                                         break;
1231                                 prev_seqn = curr_seqn;
1232                         }
1233                         /* Generate rejected message entry for missing messages */
1234                         if (prev_seqn != -1) {
1235                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1236                                 s->addEntry(rm);
1237                         }
1238                         /* Generate rejected message entries for present messages */
1239                         for (; i < rejectedSlotVector->size(); i++) {
1240                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1241                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1242                                 int64_t machineid = s_msg->getMachineID();
1243                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1244                                 s->addEntry(rm);
1245                         }
1246                 }
1247         }
1248 }
1249
1250 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1251         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1252         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1253         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1254                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1255         }
1256
1257         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1258         bool seenLiveSlot = false;
1259         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1260         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1261
1262
1263         // Mandatory Rescue
1264         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1265                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1266                 // Push slot number forward
1267                 if (!seenLiveSlot) {
1268                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1269                 }
1270
1271                 if (!previousSlot->isLive()) {
1272                         continue;
1273                 }
1274
1275                 // We have seen a live slot
1276                 seenLiveSlot = true;
1277
1278                 // Get all the live entries for a slot
1279                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1280
1281                 // Iterate over all the live entries and try to rescue them
1282                 for (Entry *liveEntry : liveEntries) {
1283                         if (slot->hasSpace(liveEntry)) {
1284
1285                                 // Enough space to rescue the entry
1286                                 slot->addEntry(liveEntry);
1287                         } else if (currentSequenceNumber == firstIfFull) {
1288                                 //if there's no space but the entry is about to fall off the queue
1289                                 System->out->println("B");      //?
1290                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1291
1292                         }
1293                 }
1294         }
1295
1296         // Did not resize
1297         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1298 }
1299
1300 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1301         /* now go through live entries from least to greatest sequence number until
1302          * either all live slots added, or the slot doesn't have enough room
1303          * for SKIP_THRESHOLD consecutive entries*/
1304         int skipcount = 0;
1305         int64_t newestseqnum = buffer->getNewestSeqNum();
1306 search:
1307         for (; seqn <= newestseqnum; seqn++) {
1308                 Slot *prevslot = buffer->getSlot(seqn);
1309                 //Push slot number forward
1310                 if (!seenliveslot)
1311                         oldestLiveSlotSequenceNumver = seqn;
1312
1313                 if (!prevslot->isLive())
1314                         continue;
1315                 seenliveslot = true;
1316                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1317                 for (Entry *liveentry : liveentries) {
1318                         if (s->hasSpace(liveentry))
1319                                 s->addEntry(liveentry);
1320                         else {
1321                                 skipcount++;
1322                                 if (skipcount > Table_SKIP_THRESHOLD)
1323                                         break search;
1324                         }
1325                 }
1326         }
1327 }
1328
1329 /**
1330  * Checks for malicious activity and updates the local copy of the block chain->
1331  */
1332 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1333
1334         // The cloud communication layer has checked slot HMACs already before decoding
1335         if (newSlots->length() == 0) {
1336                 return;
1337         }
1338
1339         // Make sure all slots are newer than the last largest slot this client has seen
1340         int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
1341         if (firstSeqNum <= sequenceNumber) {
1342                 throw new Error("Server Error: Sent older slots!");
1343         }
1344
1345         // Create an object that can access both new slots and slots in our local chain
1346         // without committing slots to our local chain
1347         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1348
1349         // Check that the HMAC chain is not broken
1350         checkHMACChain(indexer, newSlots);
1351
1352         // Set to keep track of messages from clients
1353         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1354
1355         // Process each slots data
1356         for (Slot *slot : newSlots) {
1357                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1358
1359                 updateExpectedSize();
1360         }
1361
1362         // If there is a gap, check to see if the server sent us everything->
1363         if (firstSeqNum != (sequenceNumber + 1)) {
1364
1365                 // Check the size of the slots that were sent down by the server->
1366                 // Can only check the size if there was a gap
1367                 checkNumSlots(newSlots->length);
1368
1369                 // Since there was a gap every machine must have pushed a slot or must have
1370                 // a last message message->  If not then the server is hiding slots
1371                 if (!machineSet->isEmpty()) {
1372                         throw new Error("Missing record for machines: " + machineSet);
1373                 }
1374         }
1375
1376         // Update the size of our local block chain->
1377         commitNewMaxSize();
1378
1379         // Commit new to slots to the local block chain->
1380         for (Slot *slot : newSlots) {
1381
1382                 // Insert this slot into our local block chain copy->
1383                 buffer->putSlot(slot);
1384
1385                 // Keep track of how many slots are currently live (have live data in them)->
1386                 liveSlotCount++;
1387         }
1388
1389         // Get the sequence number of the latest slot in the system
1390         sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
1391
1392         updateLiveStateFromServer();
1393
1394         // No Need to remember after we pulled from the server
1395         offlineTransactionsCommittedAndAtServer->clear();
1396
1397         // This is invalidated now
1398         hadPartialSendToServer = false;
1399 }
1400
1401 void Table::updateLiveStateFromServer() {
1402         // Process the new transaction parts
1403         processNewTransactionParts();
1404
1405         // Do arbitration on new transactions that were received
1406         arbitrateFromServer();
1407
1408         // Update all the committed keys
1409         bool didCommitOrSpeculate = updateCommittedTable();
1410
1411         // Delete the transactions that are now dead
1412         updateLiveTransactionsAndStatus();
1413
1414         // Do speculations
1415         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1416         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1417 }
1418
1419 void Table::updateLiveStateFromLocal() {
1420         // Update all the committed keys
1421         bool didCommitOrSpeculate = updateCommittedTable();
1422
1423         // Delete the transactions that are now dead
1424         updateLiveTransactionsAndStatus();
1425
1426         // Do speculations
1427         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1428         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1429 }
1430
1431 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1432         // if (didFindTableStatus) {
1433         // return;
1434         // }
1435         int64_t prevslots = firstSequenceNumber;
1436
1437
1438         if (didFindTableStatus) {
1439                 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1440                 // System->out->println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1441
1442         } else {
1443                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1444                 // System->out->println("Here: " + expectedsize);
1445         }
1446
1447         // System->out->println(numberOfSlots);
1448
1449         didFindTableStatus = true;
1450         currMaxSize = numberOfSlots;
1451 }
1452
1453 void Table::updateExpectedSize() {
1454         expectedsize++;
1455
1456         if (expectedsize > currMaxSize) {
1457                 expectedsize = currMaxSize;
1458         }
1459 }
1460
1461
1462 /**
1463  * Check the size of the block chain to make sure there are enough slots sent back by the server->
1464  * This is only called when we have a gap between the slots that we have locally and the slots
1465  * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1466  * status message
1467  */
1468 void Table::checkNumSlots(int numberOfSlots) {
1469         if (numberOfSlots != expectedsize) {
1470                 throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
1471         }
1472 }
1473
1474 void Table::updateCurrMaxSize(int newmaxsize) {
1475         currMaxSize = newmaxsize;
1476 }
1477
1478
1479 /**
1480  * Update the size of of the local buffer if it is needed->
1481  */
1482 void Table::commitNewMaxSize() {
1483         didFindTableStatus = false;
1484
1485         // Resize the local slot buffer
1486         if (numberOfSlots != currMaxSize) {
1487                 buffer->resize((int32_t)currMaxSize);
1488         }
1489
1490         // Change the number of local slots to the new size
1491         numberOfSlots = (int32_t)currMaxSize;
1492
1493
1494         // Recalculate the resize threshold since the size of the local buffer has changed
1495         setResizeThreshold();
1496 }
1497
1498 /**
1499  * Process the new transaction parts from this latest round of slots received from the server
1500  */
1501 void Table::processNewTransactionParts() {
1502
1503         if (newTransactionParts->size() == 0) {
1504                 // Nothing new to process
1505                 return;
1506         }
1507
1508         // Iterate through all the machine Ids that we received new parts for
1509         for (int64_t machineId : newTransactionParts->keySet()) {
1510                 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1511
1512                 // Iterate through all the parts for that machine Id
1513                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1514                         TransactionPart *part = parts->get(partId);
1515
1516                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1517                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1518                                 // Set dead the transaction part
1519                                 part->setDead();
1520                                 continue;
1521                         }
1522
1523                         // Get the transaction object for that sequence number
1524                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1525
1526                         if (transaction == NULL) {
1527                                 // This is a new transaction that we dont have so make a new one
1528                                 transaction = new Transaction();
1529
1530                                 // Insert this new transaction into the live tables
1531                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1532                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1533                         }
1534
1535                         // Add that part to the transaction
1536                         transaction->addPartDecode(part);
1537                 }
1538         }
1539
1540         // Clear all the new transaction parts in preparation for the next time the server sends slots
1541         newTransactionParts->clear();
1542 }
1543
1544 void Table::arbitrateFromServer() {
1545
1546         if (liveTransactionBySequenceNumberTable->size() == 0) {
1547                 // Nothing to arbitrate on so move on
1548                 return;
1549         }
1550
1551         // Get the transaction sequence numbers and sort from oldest to newest
1552         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1553         Collections->sort(transactionSequenceNumbers);
1554
1555         // Collection of key value pairs that are
1556         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1557
1558         // The last transaction arbitrated on
1559         int64_t lastTransactionCommitted = -1;
1560         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1561
1562         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1563                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1564
1565
1566
1567                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1568                 if (transaction->getArbitrator() != localMachineId) {
1569                         continue;
1570                 }
1571
1572                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1573                         continue;
1574                 }
1575
1576                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1577                         // We have seen this already locally so dont commit again
1578                         continue;
1579                 }
1580
1581
1582                 if (!transaction->isComplete()) {
1583                         // Will arbitrate in incorrect order if we continue so just break
1584                         // Most likely this
1585                         break;
1586                 }
1587
1588
1589                 // update the largest transaction seen by arbitrator from server
1590                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1591                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1592                 } else {
1593                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1594                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1595                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1596                         }
1597                 }
1598
1599                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1600                         // Guard evaluated as true
1601
1602                         // Update the local changes so we can make the commit
1603                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1604                                 speculativeTableTmp->put(kv->getKey(), kv);
1605                         }
1606
1607                         // Update what the last transaction committed was for use in batch commit
1608                         lastTransactionCommitted = transactionSequenceNumber;
1609                 } else {
1610                         // Guard evaluated was false so create abort
1611
1612                         // Create the abort
1613                         Abort *newAbort = new Abort(NULL,
1614                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1615                                                                                                                                         transaction->getSequenceNumber(),
1616                                                                                                                                         transaction->getMachineId(),
1617                                                                                                                                         transaction->getArbitrator(),
1618                                                                                                                                         localArbitrationSequenceNumber);
1619                         localArbitrationSequenceNumber++;
1620
1621                         generatedAborts->add(newAbort);
1622
1623                         // Insert the abort so we can process
1624                         processEntry(newAbort);
1625                 }
1626
1627                 lastSeqNumArbOn = transactionSequenceNumber;
1628
1629                 // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
1630         }
1631
1632         Commit *newCommit = NULL;
1633
1634         // If there is something to commit
1635         if (speculativeTableTmp->size() != 0) {
1636
1637                 // Create the commit and increment the commit sequence number
1638                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1639                 localArbitrationSequenceNumber++;
1640
1641                 // Add all the new keys to the commit
1642                 for (KeyValue *kv : speculativeTableTmp->values()) {
1643                         newCommit->addKV(kv);
1644                 }
1645
1646                 // create the commit parts
1647                 newCommit->createCommitParts();
1648
1649                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1650
1651                 // Insert the commit so we can process it
1652                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1653                         processEntry(commitPart);
1654                 }
1655         }
1656
1657         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1658                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1659                 pendingSendArbitrationRounds->add(arbitrationRound);
1660
1661                 if (compactArbitrationData()) {
1662                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1663                         if (newArbitrationRound->getCommit() != NULL) {
1664                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1665                                         processEntry(commitPart);
1666                                 }
1667                         }
1668                 }
1669         }
1670 }
1671
1672 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1673
1674         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1675         if (transaction->getArbitrator() != localMachineId) {
1676                 return Pair<bool, bool>(false, false);
1677         }
1678
1679         if (!transaction->isComplete()) {
1680                 // Will arbitrate in incorrect order if we continue so just break
1681                 // Most likely this
1682                 return Pair<bool, bool>(false, false);
1683         }
1684
1685         if (transaction->getMachineId() != localMachineId) {
1686                 // dont do this check for local transactions
1687                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1688                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1689                                 // We've have already seen this from the server
1690                                 return Pair<bool, bool>(false, false);
1691                         }
1692                 }
1693         }
1694
1695         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1696                 // Guard evaluated as true
1697
1698                 // Create the commit and increment the commit sequence number
1699                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1700                 localArbitrationSequenceNumber++;
1701
1702                 // Update the local changes so we can make the commit
1703                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1704                         newCommit->addKV(kv);
1705                 }
1706
1707                 // create the commit parts
1708                 newCommit->createCommitParts();
1709
1710                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1711                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1712                 pendingSendArbitrationRounds->add(arbitrationRound);
1713
1714                 if (compactArbitrationData()) {
1715                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1716                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1717                                 processEntry(commitPart);
1718                         }
1719                 } else {
1720                         // Insert the commit so we can process it
1721                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1722                                 processEntry(commitPart);
1723                         }
1724                 }
1725
1726                 if (transaction->getMachineId() == localMachineId) {
1727                         TransactionStatus *status = transaction->getTransactionStatus();
1728                         if (status != NULL) {
1729                                 status->setStatus(TransactionStatus_StatusCommitted);
1730                         }
1731                 }
1732
1733                 updateLiveStateFromLocal();
1734                 return Pair<bool, bool>(true, true);
1735         } else {
1736
1737                 if (transaction->getMachineId() == localMachineId) {
1738                         // For locally created messages update the status
1739
1740                         // Guard evaluated was false so create abort
1741                         TransactionStatus status = transaction->getTransactionStatus();
1742                         if (status != NULL) {
1743                                 status->setStatus(TransactionStatus_StatusAborted);
1744                         }
1745                 } else {
1746                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1747
1748
1749                         // Create the abort
1750                         Abort *newAbort = new Abort(NULL,
1751                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1752                                                                                                                                         -1,
1753                                                                                                                                         transaction->getMachineId(),
1754                                                                                                                                         transaction->getArbitrator(),
1755                                                                                                                                         localArbitrationSequenceNumber);
1756                         localArbitrationSequenceNumber++;
1757
1758                         addAbortSet->add(newAbort);
1759
1760
1761                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1762                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1763                         pendingSendArbitrationRounds->add(arbitrationRound);
1764
1765                         if (compactArbitrationData()) {
1766                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1767                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1768                                         processEntry(commitPart);
1769                                 }
1770                         }
1771                 }
1772
1773                 updateLiveStateFromLocal();
1774                 return Pair<bool, bool>(true, false);
1775         }
1776 }
1777
1778 /**
1779  * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1780  */
1781 bool Table::compactArbitrationData() {
1782
1783         if (pendingSendArbitrationRounds->size() < 2) {
1784                 // Nothing to compact so do nothing
1785                 return false;
1786         }
1787
1788         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1789         if (lastRound->didSendPart()) {
1790                 return false;
1791         }
1792
1793         bool hadCommit = (lastRound->getCommit() == NULL);
1794         bool gotNewCommit = false;
1795
1796         int numberToDelete = 1;
1797         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1798                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1799
1800                 if (round->isFull() || round->didSendPart()) {
1801                         // Stop since there is a part that cannot be compacted and we need to compact in order
1802                         break;
1803                 }
1804
1805                 if (round->getCommit() == NULL) {
1806
1807                         // Try compacting aborts only
1808                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1809                         if (newSize > ArbitrationRound->MAX_PARTS) {
1810                                 // Cant compact since it would be too large
1811                                 break;
1812                         }
1813                         lastRound->addAborts(round->getAborts());
1814                 } else {
1815
1816                         // Create a new larger commit
1817                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1818                         localArbitrationSequenceNumber++;
1819
1820                         // Create the commit parts so that we can count them
1821                         newCommit->createCommitParts();
1822
1823                         // Calculate the new size of the parts
1824                         int newSize = newCommit->getNumberOfParts();
1825                         newSize += lastRound->getAbortsCount();
1826                         newSize += round->getAbortsCount();
1827
1828                         if (newSize > ArbitrationRound->MAX_PARTS) {
1829                                 // Cant compact since it would be too large
1830                                 break;
1831                         }
1832
1833                         // Set the new compacted part
1834                         lastRound->setCommit(newCommit);
1835                         lastRound->addAborts(round->getAborts());
1836                         gotNewCommit = true;
1837                 }
1838
1839                 numberToDelete++;
1840         }
1841
1842         if (numberToDelete != 1) {
1843                 // If there is a compaction
1844
1845                 // Delete the previous pieces that are now in the new compacted piece
1846                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1847                         pendingSendArbitrationRounds->clear();
1848                 } else {
1849                         for (int i = 0; i < numberToDelete; i++) {
1850                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1851                         }
1852                 }
1853
1854                 // Add the new compacted into the pending to send list
1855                 pendingSendArbitrationRounds->add(lastRound);
1856
1857                 // Should reinsert into the commit processor
1858                 if (hadCommit && gotNewCommit) {
1859                         return true;
1860                 }
1861         }
1862
1863         return false;
1864 }
1865 // bool compactArbitrationData() {
1866 //  return false;
1867 // }
1868
1869 /**
1870  * Update all the commits and the committed tables, sets dead the dead transactions
1871  */
1872 bool Table::updateCommittedTable() {
1873
1874         if (newCommitParts->size() == 0) {
1875                 // Nothing new to process
1876                 return false;
1877         }
1878
1879         // Iterate through all the machine Ids that we received new parts for
1880         for (int64_t machineId : newCommitParts->keySet()) {
1881                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1882
1883                 // Iterate through all the parts for that machine Id
1884                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1885                         CommitPart *part = parts->get(partId);
1886
1887                         // Get the transaction object for that sequence number
1888                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1889
1890                         if (commitForClientTable == NULL) {
1891                                 // This is the first commit from this device
1892                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1893                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1894                         }
1895
1896                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1897
1898                         if (commit == NULL) {
1899                                 // This is a new commit that we dont have so make a new one
1900                                 commit = new Commit();
1901
1902                                 // Insert this new commit into the live tables
1903                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1904                         }
1905
1906                         // Add that part to the commit
1907                         commit->addPartDecode(part);
1908                 }
1909         }
1910
1911         // Clear all the new commits parts in preparation for the next time the server sends slots
1912         newCommitParts->clear();
1913
1914         // If we process a new commit keep track of it for future use
1915         bool didProcessANewCommit = false;
1916
1917         // Process the commits one by one
1918         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1919
1920                 // Get all the commits for a specific arbitrator
1921                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1922
1923                 // Sort the commits in order
1924                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1925                 Collections->sort(commitSequenceNumbers);
1926
1927                 // Get the last commit seen from this arbitrator
1928                 int64_t lastCommitSeenSequenceNumber = -1;
1929                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1930                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1931                 }
1932
1933                 // Go through each new commit one by one
1934                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1935                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1936                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1937
1938                         // Special processing if a commit is not complete
1939                         if (!commit->isComplete()) {
1940                                 if (i == (commitSequenceNumbers->size() - 1)) {
1941                                         // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1942                                         break;
1943                                 } else {
1944                                         // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
1945                                         // Delete it and move on
1946                                         commit->setDead();
1947                                         commitForClientTable->remove(commit->getSequenceNumber());
1948                                         continue;
1949                                 }
1950                         }
1951
1952                         // Update the last transaction that was updated if we can
1953                         if (commit->getTransactionSequenceNumber() != -1) {
1954                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1955
1956                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1957                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1958                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1959                                 }
1960                         }
1961
1962                         // Update the last arbitration data that we have seen so far
1963                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1964
1965                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1966                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1967                                         // Is larger
1968                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1969                                 }
1970                         } else {
1971                                 // Never seen any data from this arbitrator so record the first one
1972                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1973                         }
1974
1975                         // We have already seen this commit before so need to do the full processing on this commit
1976                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1977
1978                                 // Update the last transaction that was updated if we can
1979                                 if (commit->getTransactionSequenceNumber() != -1) {
1980                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1981
1982                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1983                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1984                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1985                                         }
1986                                 }
1987
1988                                 continue;
1989                         }
1990
1991                         // If we got here then this is a brand new commit and needs full processing
1992
1993                         // Get what commits should be edited, these are the commits that have live values for their keys
1994                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1995                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1996                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1997                         }
1998                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
1999
2000                         // Update each previous commit that needs to be updated
2001                         for (Commit *previousCommit : commitsToEdit) {
2002
2003                                 // Only bother with live commits (TODO: Maybe remove this check)
2004                                 if (previousCommit->isLive()) {
2005
2006                                         // Update which keys in the old commits are still live
2007                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2008                                                 previousCommit->invalidateKey(kv->getKey());
2009                                         }
2010
2011                                         // if the commit is now dead then remove it
2012                                         if (!previousCommit->isLive()) {
2013                                                 commitForClientTable->remove(previousCommit);
2014                                         }
2015                                 }
2016                         }
2017
2018                         // Update the last seen sequence number from this arbitrator
2019                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2020                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2021                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2022                                 }
2023                         } else {
2024                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2025                         }
2026
2027                         // We processed a new commit that we havent seen before
2028                         didProcessANewCommit = true;
2029
2030                         // Update the committed table of keys and which commit is using which key
2031                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2032                                 committedKeyValueTable->put(kv->getKey(), kv);
2033                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2034                         }
2035                 }
2036         }
2037
2038         return didProcessANewCommit;
2039 }
2040
2041 /**
2042  * Create the speculative table from transactions that are still live and have come from the cloud
2043  */
2044 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2045         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2046                 // There is nothing to speculate on
2047                 return false;
2048         }
2049
2050         // Create a list of the transaction sequence numbers and sort them from oldest to newest
2051         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2052         Collections->sort(transactionSequenceNumbersSorted);
2053
2054         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2055
2056
2057         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2058                 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2059                 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2060
2061                 // Start from scratch
2062                 speculatedKeyValueTable->clear();
2063                 lastTransactionSequenceNumberSpeculatedOn = -1;
2064                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2065
2066         }
2067
2068         // Remember the front of the transaction list
2069         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2070
2071         // Find where to start arbitration from
2072         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2073
2074         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2075                 // Make sure we are not out of bounds
2076                 return false;           // did not speculate
2077         }
2078
2079         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2080         bool didSkip = true;
2081
2082         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2083                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2084                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2085
2086                 if (!transaction->isComplete()) {
2087                         // If there is an incomplete transaction then there is nothing we can do
2088                         // add this transactions arbitrator to the list of arbitrators we should ignore
2089                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2090                         didSkip = true;
2091                         continue;
2092                 }
2093
2094                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2095                         continue;
2096                 }
2097
2098                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2099
2100                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2101                         // Guard evaluated to true so update the speculative table
2102                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2103                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2104                         }
2105                 }
2106         }
2107
2108         if (didSkip) {
2109                 // Since there was a skip we need to redo the speculation next time around
2110                 lastTransactionSequenceNumberSpeculatedOn = -1;
2111                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2112         }
2113
2114         // We did some speculation
2115         return true;
2116 }
2117
2118 /**
2119  * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2120  */
2121 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2122         if (pendingTransactionQueue->size() == 0) {
2123                 // There is nothing to speculate on
2124                 return;
2125         }
2126
2127         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2128                 // need to reset on the pending speculation
2129                 lastPendingTransactionSpeculatedOn = NULL;
2130                 firstPendingTransaction = pendingTransactionQueue->get(0);
2131                 pendingTransactionSpeculatedKeyValueTable->clear();
2132         }
2133
2134         // Find where to start arbitration from
2135         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2136
2137         if (startIndex >= pendingTransactionQueue->size()) {
2138                 // Make sure we are not out of bounds
2139                 return;
2140         }
2141
2142         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2143                 Transaction *transaction = pendingTransactionQueue->get(i);
2144
2145                 lastPendingTransactionSpeculatedOn = transaction;
2146
2147                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2148                         // Guard evaluated to true so update the speculative table
2149                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2150                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2151                         }
2152                 }
2153         }
2154 }
2155
2156 /**
2157  * Set dead and remove from the live transaction tables the transactions that are dead
2158  */
2159 void Table::updateLiveTransactionsAndStatus() {
2160
2161         // Go through each of the transactions
2162         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2163                 Transaction *transaction = iter->next()->getValue();
2164
2165                 // Check if the transaction is dead
2166                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2167                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2168
2169                         // Set dead the transaction
2170                         transaction->setDead();
2171
2172                         // Remove the transaction from the live table
2173                         iter->remove();
2174                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2175                 }
2176         }
2177
2178         // Go through each of the transactions
2179         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2180                 TransactionStatus *status = iter->next()->getValue();
2181
2182                 // Check if the transaction is dead
2183                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2184                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2185
2186                         // Set committed
2187                         status->setStatus(TransactionStatus_StatusCommitted);
2188
2189                         // Remove
2190                         iter->remove();
2191                 }
2192         }
2193 }
2194
2195 /**
2196  * Process this slot, entry by entry->  Also update the latest message sent by slot
2197  */
2198 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2199
2200         // Update the last message seen
2201         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2202
2203         // Process each entry in the slot
2204         for (Entry *entry : slot->getEntries()) {
2205                 switch (entry->getType()) {
2206
2207                 case TypeCommitPart:
2208                         processEntry((CommitPart *)entry);
2209                         break;
2210
2211                 case TypeAbort:
2212                         processEntry((Abort *)entry);
2213                         break;
2214
2215                 case TypeTransactionPart:
2216                         processEntry((TransactionPart *)entry);
2217                         break;
2218
2219                 case TypeNewKey:
2220                         processEntry((NewKey *)entry);
2221                         break;
2222
2223                 case TypeLastMessage:
2224                         processEntry((LastMessage *)entry, machineSet);
2225                         break;
2226
2227                 case TypeRejectedMessage:
2228                         processEntry((RejectedMessage *)entry, indexer);
2229                         break;
2230
2231                 case TypeTableStatus:
2232                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2233                         break;
2234
2235                 default:
2236                         throw new Error("Unrecognized type: " + entry->getType());
2237                 }
2238         }
2239 }
2240
2241 /**
2242  * Update the last message that was sent for a machine Id
2243  */
2244 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2245         // Update what the last message received by a machine was
2246         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2247 }
2248
2249 /**
2250  * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2251  */
2252 void Table::processEntry(NewKey *entry) {
2253
2254         // Update the arbitrator table with the new key information
2255         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2256
2257         // Update what the latest live new key is
2258         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2259         if (oldNewKey != NULL) {
2260                 // Delete the old new key messages
2261                 oldNewKey->setDead();
2262         }
2263 }
2264
2265 /**
2266  * Process new table status entries and set dead the old ones as new ones come in->
2267  * keeps track of the largest and smallest table status seen in this current round
2268  * of updating the local copy of the block chain
2269  */
2270 void Table::processEntry(TableStatus entry, int64_t seq) {
2271         int newNumSlots = entry->getMaxSlots();
2272         updateCurrMaxSize(newNumSlots);
2273
2274         initExpectedSize(seq, newNumSlots);
2275
2276         if (liveTableStatus != NULL) {
2277                 // We have a larger table status so the old table status is no int64_ter alive
2278                 liveTableStatus->setDead();
2279         }
2280
2281         // Make this new table status the latest alive table status
2282         liveTableStatus = entry;
2283 }
2284
2285 /**
2286  * Check old messages to see if there is a block chain violation-> Also
2287  */
2288 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2289         int64_t oldSeqNum = entry->getOldSeqNum();
2290         int64_t newSeqNum = entry->getNewSeqNum();
2291         bool isequal = entry->getEqual();
2292         int64_t machineId = entry->getMachineID();
2293         int64_t seq = entry->getSequenceNumber();
2294
2295
2296         // Check if we have messages that were supposed to be rejected in our local block chain
2297         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2298
2299                 // Get the slot
2300                 Slot *slot = indexer->getSlot(seqNum);
2301
2302                 if (slot != NULL) {
2303                         // If we have this slot make sure that it was not supposed to be a rejected slot
2304
2305                         int64_t slotMachineId = slot->getMachineID();
2306                         if (isequal != (slotMachineId == machineId)) {
2307                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2308                         }
2309                 }
2310         }
2311
2312
2313         // Create a list of clients to watch until they see this rejected message entry->
2314         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2315         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2316
2317                 // Machine ID for the last message entry
2318                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2319
2320                 // We've seen it, don't need to continue to watch->  Our next
2321                 // message will implicitly acknowledge it->
2322                 if (lastMessageEntryMachineId == localMachineId) {
2323                         continue;
2324                 }
2325
2326                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2327                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2328
2329                 if (entrySequenceNumber < seq) {
2330
2331                         // Add this rejected message to the set of messages that this machine ID did not see yet
2332                         addWatchVector(lastMessageEntryMachineId, entry);
2333
2334                         // This client did not see this rejected message yet so add it to the watch set to monitor
2335                         deviceWatchSet->add(lastMessageEntryMachineId);
2336                 }
2337         }
2338
2339         if (deviceWatchSet->isEmpty()) {
2340                 // This rejected message has been seen by all the clients so
2341                 entry->setDead();
2342         } else {
2343                 // We need to watch this rejected message
2344                 entry->setWatchSet(deviceWatchSet);
2345         }
2346 }
2347
2348 /**
2349  * Check if this abort is live, if not then save it so we can kill it later->
2350  * update the last transaction number that was arbitrated on->
2351  */
2352 void Table::processEntry(Abort *entry) {
2353
2354
2355         if (entry->getTransactionSequenceNumber() != -1) {
2356                 // update the transaction status if it was sent to the server
2357                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2358                 if (status != NULL) {
2359                         status->setStatus(TransactionStatus_StatusAborted);
2360                 }
2361         }
2362
2363         // Abort has not been seen by the client it is for yet so we need to keep track of it
2364         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2365         if (previouslySeenAbort != NULL) {
2366                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2367         }
2368
2369         if (entry->getTransactionArbitrator() == localMachineId) {
2370                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2371         }
2372
2373         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2374
2375                 // The machine already saw this so it is dead
2376                 entry->setDead();
2377                 liveAbortTable->remove(entry->getAbortId());
2378
2379                 if (entry->getTransactionArbitrator() == localMachineId) {
2380                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2381                 }
2382
2383                 return;
2384         }
2385
2386         // Update the last arbitration data that we have seen so far
2387         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2388
2389                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2390                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2391                         // Is larger
2392                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2393                 }
2394         } else {
2395                 // Never seen any data from this arbitrator so record the first one
2396                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2397         }
2398
2399
2400         // Set dead a transaction if we can
2401         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2402         if (transactionToSetDead != NULL) {
2403                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2404         }
2405
2406         // Update the last transaction sequence number that the arbitrator arbitrated on
2407         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2408         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2409
2410                 // Is a valid one
2411                 if (entry->getTransactionSequenceNumber() != -1) {
2412                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2413                 }
2414         }
2415 }
2416
2417 /**
2418  * Set dead the transaction part if that transaction is dead and keep track of all new parts
2419  */
2420 void Table::processEntry(TransactionPart *entry) {
2421         // Check if we have already seen this transaction and set it dead OR if it is not alive
2422         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2423         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2424                 // This transaction is dead, it was already committed or aborted
2425                 entry->setDead();
2426                 return;
2427         }
2428
2429         // This part is still alive
2430         Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2431
2432         if (transactionPart == NULL) {
2433                 // Dont have a table for this machine Id yet so make one
2434                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2435                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2436         }
2437
2438         // Update the part and set dead ones we have already seen (got a rescued version)
2439         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2440         if (previouslySeenPart != NULL) {
2441                 previouslySeenPart->setDead();
2442         }
2443 }
2444
2445 /**
2446  * Process new commit entries and save them for future use->  Delete duplicates
2447  */
2448 void Table::processEntry(CommitPart *entry) {
2449         // Update the last transaction that was updated if we can
2450         if (entry->getTransactionSequenceNumber() != -1) {
2451                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2452
2453                 // Update the last transaction sequence number that the arbitrator arbitrated on
2454                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2455                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2456                 }
2457         }
2458
2459
2460
2461
2462         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2463
2464         if (commitPart == NULL) {
2465                 // Don't have a table for this machine Id yet so make one
2466                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2467                 newCommitParts->put(entry->getMachineId(), commitPart);
2468         }
2469
2470         // Update the part and set dead ones we have already seen (got a rescued version)
2471         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2472         if (previouslySeenPart != NULL) {
2473                 previouslySeenPart->setDead();
2474         }
2475 }
2476
2477 /**
2478  * Update the last message seen table->  Update and set dead the appropriate RejectedMessages as clients see them->
2479  * Updates the live aborts, removes those that are dead and sets them dead->
2480  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2481  * other clients have not had a rollback on the last message->
2482  */
2483 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2484
2485         // We have seen this machine ID
2486         machineSet->remove(machineId);
2487
2488         // Get the set of rejected messages that this machine Id is has not seen yet
2489         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2490
2491         // If there is a rejected message that this machine Id has not seen yet
2492         if (watchset != NULL) {
2493
2494                 // Go through each rejected message that this machine Id has not seen yet
2495                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2496
2497                         RejectedMessage *rm = rmit->next();
2498
2499                         // If this machine Id has seen this rejected message->->->
2500                         if (rm->getSequenceNumber() <= seqNum) {
2501
2502                                 // Remove it from our watchlist
2503                                 rmit->remove();
2504
2505                                 // Decrement machines that need to see this notification
2506                                 rm->removeWatcher(machineId);
2507                         }
2508                 }
2509         }
2510
2511         // Set dead the abort
2512         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2513                 Abort *abort = i->next()->getValue();
2514
2515                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2516                         abort->setDead();
2517                         i->remove();
2518
2519                         if (abort->getTransactionArbitrator() == localMachineId) {
2520                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2521                         }
2522                 }
2523         }
2524
2525
2526
2527         if (machineId == localMachineId) {
2528                 // Our own messages are immediately dead->
2529                 if (liveness instanceof LastMessage) {
2530                         ((LastMessage *)liveness)->setDead();
2531                 } else if (liveness instanceof Slot) {
2532                         ((Slot *)liveness)->setDead();
2533                 } else {
2534                         throw new Error("Unrecognized type");
2535                 }
2536         }
2537
2538         // Get the old last message for this device
2539         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2540         if (lastMessageEntry == NULL) {
2541                 // If no last message then there is nothing else to process
2542                 return;
2543         }
2544
2545         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2546         Liveness *lastEntry = lastMessageEntry->getSecond();
2547
2548         // If it is not our machine Id since we already set ours to dead
2549         if (machineId != localMachineId) {
2550                 if (lastEntry instanceof LastMessage) {
2551                         ((LastMessage *)lastEntry)->setDead();
2552                 } else if (lastEntry instanceof Slot) {
2553                         ((Slot *)lastEntry)->setDead();
2554                 } else {
2555                         throw new Error("Unrecognized type");
2556                 }
2557         }
2558
2559         // Make sure the server is not playing any games
2560         if (machineId == localMachineId) {
2561
2562                 if (hadPartialSendToServer) {
2563                         // We were not making any updates and we had a machine mismatch
2564                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2565                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2566                         }
2567
2568                 } else {
2569                         // We were not making any updates and we had a machine mismatch
2570                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2571                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2572                         }
2573                 }
2574         } else {
2575                 if (lastMessageSeqNum > seqNum) {
2576                         throw new Error("Server Error: Rollback on remote machine sequence number");
2577                 }
2578         }
2579 }
2580
2581 /**
2582  * Add a rejected message entry to the watch set to keep track of which clients have seen that
2583  * rejected message entry and which have not->
2584  */
2585 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2586         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2587         if (entries == NULL) {
2588                 // There is no set for this machine ID yet so create one
2589                 entries = new Hashset<RejectedMessage *>();
2590                 rejectedMessageWatchVectorTable->put(machineId, entries);
2591         }
2592         entries->add(entry);
2593 }
2594
2595 /**
2596  * Check if the HMAC chain is not violated
2597  */
2598 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2599         for (int i = 0; i < newSlots->length; i++) {
2600                 Slot *currSlot = newSlots[i];
2601                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2602                 if (prevSlot != NULL &&
2603                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2604                         throw new Error("Server Error: Invalid HMAC Chain");
2605         }
2606 }
2607