edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14
15
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
17         buffer(NULL),
18         cloud(new CloudComm(this, baseurl, password, listeningPort)),
19         random(NULL),
20         liveTableStatus(NULL),
21         pendingTransactionBuilder(NULL),
22         lastPendingTransactionSpeculatedOn(NULL),
23         firstPendingTransaction(NULL),
24         numberOfSlots(0),
25         bufferResizeThreshold(0),
26         liveSlotCount(0),
27         oldestLiveSlotSequenceNumver(1),
28         localMachineId(_localMachineId),
29         sequenceNumber(0),
30         localTransactionSequenceNumber(0),
31         lastTransactionSequenceNumberSpeculatedOn(0),
32         oldestTransactionSequenceNumberSpeculatedOn(0),
33         localArbitrationSequenceNumber(0),
34         hadPartialSendToServer(false),
35         attemptedToSendToServer(false),
36         expectedsize(0),
37         didFindTableStatus(false),
38         currMaxSize(0),
39         lastSlotAttemptedToSend(NULL),
40         lastIsNewKey(false),
41         lastNewSize(0),
42         lastTransactionPartsSent(NULL),
43         lastPendingSendArbitrationEntriesToDelete(NULL),
44         lastNewKey(NULL),
45         committedKeyValueTable(NULL),
46         speculatedKeyValueTable(NULL),
47         pendingTransactionSpeculatedKeyValueTable(NULL),
48         liveNewKeyTable(NULL),
49         lastMessageTable(NULL),
50         rejectedMessageWatchVectorTable(NULL),
51         arbitratorTable(NULL),
52         liveAbortTable(NULL),
53         newTransactionParts(NULL),
54         newCommitParts(NULL),
55         lastArbitratedTransactionNumberByArbitratorTable(NULL),
56         liveTransactionBySequenceNumberTable(NULL),
57         liveTransactionByTransactionIdTable(NULL),
58         liveCommitsTable(NULL),
59         liveCommitsByKeyTable(NULL),
60         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61         rejectedSlotVector(NULL),
62         pendingTransactionQueue(NULL),
63         pendingSendArbitrationRounds(NULL),
64         pendingSendArbitrationEntriesToDelete(NULL),
65         transactionPartsSent(NULL),
66         outstandingTransactionStatus(NULL),
67         liveAbortsGeneratedByLocal(NULL),
68         offlineTransactionsCommittedAndAtServer(NULL),
69         localCommunicationTable(NULL),
70         lastTransactionSeenFromMachineFromServer(NULL),
71         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72         lastInsertedNewKey(false),
73         lastSeqNumArbOn(0)
74 {
75         init();
76 }
77
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
79         buffer(NULL),
80         cloud(_cloud),
81         random(NULL),
82         liveTableStatus(NULL),
83         pendingTransactionBuilder(NULL),
84         lastPendingTransactionSpeculatedOn(NULL),
85         firstPendingTransaction(NULL),
86         numberOfSlots(0),
87         bufferResizeThreshold(0),
88         liveSlotCount(0),
89         oldestLiveSlotSequenceNumver(1),
90         localMachineId(_localMachineId),
91         sequenceNumber(0),
92         localTransactionSequenceNumber(0),
93         lastTransactionSequenceNumberSpeculatedOn(0),
94         oldestTransactionSequenceNumberSpeculatedOn(0),
95         localArbitrationSequenceNumber(0),
96         hadPartialSendToServer(false),
97         attemptedToSendToServer(false),
98         expectedsize(0),
99         didFindTableStatus(false),
100         currMaxSize(0),
101         lastSlotAttemptedToSend(NULL),
102         lastIsNewKey(false),
103         lastNewSize(0),
104         lastTransactionPartsSent(NULL),
105         lastPendingSendArbitrationEntriesToDelete(NULL),
106         lastNewKey(NULL),
107         committedKeyValueTable(NULL),
108         speculatedKeyValueTable(NULL),
109         pendingTransactionSpeculatedKeyValueTable(NULL),
110         liveNewKeyTable(NULL),
111         lastMessageTable(NULL),
112         rejectedMessageWatchVectorTable(NULL),
113         arbitratorTable(NULL),
114         liveAbortTable(NULL),
115         newTransactionParts(NULL),
116         newCommitParts(NULL),
117         lastArbitratedTransactionNumberByArbitratorTable(NULL),
118         liveTransactionBySequenceNumberTable(NULL),
119         liveTransactionByTransactionIdTable(NULL),
120         liveCommitsTable(NULL),
121         liveCommitsByKeyTable(NULL),
122         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123         rejectedSlotVector(NULL),
124         pendingTransactionQueue(NULL),
125         pendingSendArbitrationRounds(NULL),
126         pendingSendArbitrationEntriesToDelete(NULL),
127         transactionPartsSent(NULL),
128         outstandingTransactionStatus(NULL),
129         liveAbortsGeneratedByLocal(NULL),
130         offlineTransactionsCommittedAndAtServer(NULL),
131         localCommunicationTable(NULL),
132         lastTransactionSeenFromMachineFromServer(NULL),
133         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134         lastInsertedNewKey(false),
135         lastSeqNumArbOn(0)
136 {
137         init();
138 }
139
140 /**
141  * Init all the stuff needed for for table usage
142  */
143 void Table::init() {
144         // Init helper objects
145         random = new Random();
146         buffer = new SlotBuffer();
147
148         // init data structs
149         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155         arbitratorTable = new Hashtable<IoTString *, int64_t>();
156         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165         rejectedSlotVector = new Vector<int64_t>();
166         pendingTransactionQueue = new Vector<Transaction *>();
167         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
176
177
178         // Other init stuff
179         numberOfSlots = buffer->capacity();
180         setResizeThreshold();
181 }
182
183 /**
184  * Initialize the table by inserting a table status as the first entry
185  * into the table status also initialize the crypto stuff.
186  */
187 void Table::initTable() {
188         cloud->initSecurity();
189
190         // Create the first insertion into the block chain which is the table status
191         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192         localSequenceNumber++;
193         TableStatus *status = new TableStatus(s, numberOfSlots);
194         s->addEntry(status);
195         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
196
197         if (array == NULL) {
198                 array = new Array<Slot *>(1);
199                 array->set(0, s);
200                 // update local block chain
201                 validateAndUpdate(array, true);
202         } else if (array->length() == 1) {
203                 // in case we did push the slot BUT we failed to init it
204                 validateAndUpdate(array, true);
205         } else {
206                 throw new Error("Error on initialization");
207         }
208 }
209
210 /**
211  * Rebuild the table from scratch by pulling the latest block chain
212  * from the server.
213  */
214 void Table::rebuild() {
215         // Just pull the latest slots from the server
216         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217         validateAndUpdate(newslots, true);
218         sendToServer(NULL);
219         updateLiveTransactionsAndStatus();
220 }
221
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
224 }
225
226 int64_t Table::getArbitrator(IoTString *key) {
227         return arbitratorTable->get(key);
228 }
229
230 void Table::close() {
231         cloud->close();
232 }
233
234 IoTString *Table::getCommitted(IoTString *key)  {
235         KeyValue *kv = committedKeyValueTable->get(key);
236
237         if (kv != NULL) {
238                 return kv->getValue();
239         } else {
240                 return NULL;
241         }
242 }
243
244 IoTString *Table::getSpeculative(IoTString *key) {
245         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
246
247         if (kv == NULL) {
248                 kv = speculatedKeyValueTable->get(key);
249         }
250
251         if (kv == NULL) {
252                 kv = committedKeyValueTable->get(key);
253         }
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263         KeyValue *kv = committedKeyValueTable->get(key);
264
265         if (!arbitratorTable->contains(key)) {
266                 throw new Error("Key not Found.");
267         }
268
269         // Make sure new key value pair matches the current arbitrator
270         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271                 // TODO: Maybe not throw en error
272                 throw new Error("Not all Key Values Match Arbitrator.");
273         }
274
275         if (kv != NULL) {
276                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277                 return kv->getValue();
278         } else {
279                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
280                 return NULL;
281         }
282 }
283
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285         if (!arbitratorTable->contains(key)) {
286                 throw new Error("Key not Found.");
287         }
288
289         // Make sure new key value pair matches the current arbitrator
290         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291                 // TODO: Maybe not throw en error
292                 throw new Error("Not all Key Values Match Arbitrator.");
293         }
294
295         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
296
297         if (kv == NULL) {
298                 kv = speculatedKeyValueTable->get(key);
299         }
300
301         if (kv == NULL) {
302                 kv = committedKeyValueTable->get(key);
303         }
304
305         if (kv != NULL) {
306                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307                 return kv->getValue();
308         } else {
309                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
310                 return NULL;
311         }
312 }
313
314 bool Table::update()  {
315         try {
316                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317                 validateAndUpdate(newSlots, false);
318                 sendToServer(NULL);
319
320
321                 updateLiveTransactionsAndStatus();
322
323                 return true;
324         } catch (Exception *e) {
325                 for (int64_t m : localCommunicationTable->keySet()) {
326                         updateFromLocal(m);
327                 }
328         }
329
330         return false;
331 }
332
333 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
334         while (true) {
335                 if (!arbitratorTable->contains(keyName)) {
336                         // There is already an arbitrator
337                         return false;
338                 }
339
340                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
341
342                 if (sendToServer(newKey)) {
343                         // If successfully inserted
344                         return true;
345                 }
346         }
347 }
348
349 void Table::startTransaction() {
350         // Create a new transaction, invalidates any old pending transactions.
351         pendingTransactionBuilder = new PendingTransaction(localMachineId);
352 }
353
354 void Table::addKV(IoTString *key, IoTString *value) {
355
356         // Make sure it is a valid key
357         if (!arbitratorTable->contains(key)) {
358                 throw new Error("Key not Found.");
359         }
360
361         // Make sure new key value pair matches the current arbitrator
362         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
363                 // TODO: Maybe not throw en error
364                 throw new Error("Not all Key Values Match Arbitrator.");
365         }
366
367         // Add the key value to this transaction
368         KeyValue *kv = new KeyValue(key, value);
369         pendingTransactionBuilder->addKV(kv);
370 }
371
372 TransactionStatus *Table::commitTransaction() {
373
374         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375                 // transaction with no updates will have no effect on the system
376                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
377         }
378
379         // Set the local transaction sequence number and increment
380         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381         localTransactionSequenceNumber++;
382
383         // Create the transaction status
384         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
385
386         // Create the new transaction
387         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388         newTransaction->setTransactionStatus(transactionStatus);
389
390         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391                 // Add it to the queue and invalidate the builder for safety
392                 pendingTransactionQueue->add(newTransaction);
393         } else {
394                 arbitrateOnLocalTransaction(newTransaction);
395                 updateLiveStateFromLocal();
396         }
397
398         pendingTransactionBuilder = new PendingTransaction(localMachineId);
399
400         try {
401                 sendToServer(NULL);
402         } catch (ServerException *e) {
403
404                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405                 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
406                         Transaction *transaction = iter->next();
407
408                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409                                 // Already contacted this client so ignore all attempts to contact this client
410                                 // to preserve ordering for arbitrator
411                                 continue;
412                         }
413
414                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
415
416                         if (sendReturn->getFirst()) {
417                                 // Failed to contact over local
418                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
419                         } else {
420                                 // Successful contact or should not contact
421
422                                 if (sendReturn->getSecond()) {
423                                         // did arbitrate
424                                         iter->remove();
425                                 }
426                         }
427                 }
428         }
429
430         updateLiveStateFromLocal();
431
432         return transactionStatus;
433 }
434
435 /**
436  * Recalculate the new resize threshold
437  */
438 void Table::setResizeThreshold() {
439         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
440         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
441 }
442
443 int64_t Table::getLocalSequenceNumber() {
444         return localSequenceNumber;
445 }
446
447
448 bool lastInsertedNewKey = false;
449
450 bool Table::sendToServer(NewKey *newKey) {
451
452         bool fromRetry = false;
453
454         try {
455                 if (hadPartialSendToServer) {
456                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
457                         if (newSlots->length() == 0) {
458                                 fromRetry = true;
459                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
460
461                                 if (sendSlotsReturn->getFirst()) {
462                                         if (newKey != NULL) {
463                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
464                                                         newKey = NULL;
465                                                 }
466                                         }
467
468                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
469                                                 transaction->resetServerFailure();
470
471                                                 // Update which transactions parts still need to be sent
472                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
473
474                                                 // Add the transaction status to the outstanding list
475                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
476
477                                                 // Update the transaction status
478                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
479
480                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
481                                                 if (transaction->didSendAllParts()) {
482                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483                                                         pendingTransactionQueue->remove(transaction);
484                                                 }
485                                         }
486                                 } else {
487
488                                         newSlots = sendSlotsReturn->getThird();
489
490                                         bool isInserted = false;
491                                         for (uint si = 0; si < newSlots->length(); si++) {
492                                                 Slot *s = newSlots->get(si);
493                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
494                                                         isInserted = true;
495                                                         break;
496                                                 }
497                                         }
498
499                                         for (uint si = 0; si < newSlots->length(); si++) {
500                                                 Slot *s = newSlots->get(si);
501                                                 if (isInserted) {
502                                                         break;
503                                                 }
504
505                                                 // Process each entry in the slot
506                                                 for (Entry *entry : s->getEntries()) {
507                                                         if (entry->getType() == TypeLastMessage) {
508                                                                 LastMessage *lastMessage = (LastMessage *)entry;
509                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
510                                                                         isInserted = true;
511                                                                         break;
512                                                                 }
513                                                         }
514                                                 }
515                                         }
516
517                                         if (isInserted) {
518                                                 if (newKey != NULL) {
519                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
520                                                                 newKey = NULL;
521                                                         }
522                                                 }
523
524                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
525                                                         transaction->resetServerFailure();
526
527                                                         // Update which transactions parts still need to be sent
528                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
529
530                                                         // Add the transaction status to the outstanding list
531                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
532
533                                                         // Update the transaction status
534                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
535
536                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
537                                                         if (transaction->didSendAllParts()) {
538                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
539                                                                 pendingTransactionQueue->remove(transaction);
540                                                         } else {
541                                                                 transaction->resetServerFailure();
542                                                                 // Set the transaction sequence number back to nothing
543                                                                 if (!transaction->didSendAPartToServer()) {
544                                                                         transaction->setSequenceNumber(-1);
545                                                                 }
546                                                         }
547                                                 }
548                                         }
549                                 }
550
551                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
552                                         transaction->resetServerFailure();
553                                         // Set the transaction sequence number back to nothing
554                                         if (!transaction->didSendAPartToServer()) {
555                                                 transaction->setSequenceNumber(-1);
556                                         }
557                                 }
558
559                                 if (sendSlotsReturn->getThird()->length() != 0) {
560                                         // insert into the local block chain
561                                         validateAndUpdate(sendSlotsReturn->getThird(), true);
562                                 }
563                                 // continue;
564                         } else {
565                                 bool isInserted = false;
566                                 for (uint si = 0; si < newSlots->length(); si++) {
567                                         Slot *s = newSlots->get(si);
568                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
569                                                 isInserted = true;
570                                                 break;
571                                         }
572                                 }
573
574                                 for (uint si = 0; si < newSlots->length(); si++) {
575                                         Slot *s = newSlots->get(si);
576                                         if (isInserted) {
577                                                 break;
578                                         }
579
580                                         // Process each entry in the slot
581                                         for (Entry *entry : s->getEntries()) {
582
583                                                 if (entry->getType() == TypeLastMessage) {
584                                                         LastMessage *lastMessage = (LastMessage *)entry;
585                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
586                                                                 isInserted = true;
587                                                                 break;
588                                                         }
589                                                 }
590                                         }
591                                 }
592
593                                 if (isInserted) {
594                                         if (newKey != NULL) {
595                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
596                                                         newKey = NULL;
597                                                 }
598                                         }
599
600                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
601                                                 transaction->resetServerFailure();
602
603                                                 // Update which transactions parts still need to be sent
604                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
605
606                                                 // Add the transaction status to the outstanding list
607                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
608
609                                                 // Update the transaction status
610                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
611
612                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
613                                                 if (transaction->didSendAllParts()) {
614                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
615                                                         pendingTransactionQueue->remove(transaction);
616                                                 } else {
617                                                         transaction->resetServerFailure();
618                                                         // Set the transaction sequence number back to nothing
619                                                         if (!transaction->didSendAPartToServer()) {
620                                                                 transaction->setSequenceNumber(-1);
621                                                         }
622                                                 }
623                                         }
624                                 } else {
625                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
626                                                 transaction->resetServerFailure();
627                                                 // Set the transaction sequence number back to nothing
628                                                 if (!transaction->didSendAPartToServer()) {
629                                                         transaction->setSequenceNumber(-1);
630                                                 }
631                                         }
632                                 }
633
634                                 // insert into the local block chain
635                                 validateAndUpdate(newSlots, true);
636                         }
637                 }
638         } catch (ServerException *e) {
639                 throw e;
640         }
641
642
643
644         try {
645                 // While we have stuff that needs inserting into the block chain
646                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
647
648                         fromRetry = false;
649
650                         if (hadPartialSendToServer) {
651                                 throw new Error("Should Be error free");
652                         }
653
654
655
656                         // If there is a new key with same name then end
657                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
658                                 return false;
659                         }
660
661                         // Create the slot
662                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
663                         localSequenceNumber++;
664
665                         // Try to fill the slot with data
666                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
667                         bool needsResize = fillSlotsReturn->getFirst();
668                         int newSize = fillSlotsReturn->getSecond();
669                         bool insertedNewKey = fillSlotsReturn->getThird();
670
671                         if (needsResize) {
672                                 // Reset which transaction to send
673                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
674                                         transaction->resetNextPartToSend();
675
676                                         // Set the transaction sequence number back to nothing
677                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
678                                                 transaction->setSequenceNumber(-1);
679                                         }
680                                 }
681
682                                 // Clear the sent data since we are trying again
683                                 pendingSendArbitrationEntriesToDelete->clear();
684                                 transactionPartsSent->clear();
685
686                                 // We needed a resize so try again
687                                 fillSlot(slot, true, newKey);
688                         }
689
690                         lastSlotAttemptedToSend = slot;
691                         lastIsNewKey = (newKey != NULL);
692                         lastInsertedNewKey = insertedNewKey;
693                         lastNewSize = newSize;
694                         lastNewKey = newKey;
695                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
696                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
697
698
699                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
700
701                         if (sendSlotsReturn->getFirst()) {
702
703                                 // Did insert into the block chain
704
705                                 if (insertedNewKey) {
706                                         // This slot was what was inserted not a previous slot
707
708                                         // New Key was successfully inserted into the block chain so dont want to insert it again
709                                         newKey = NULL;
710                                 }
711
712                                 // Remove the aborts and commit parts that were sent from the pending to send queue
713                                 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
714                                         ArbitrationRound *round = iter->next();
715                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
716
717                                         if (round->isDoneSending()) {
718                                                 // Sent all the parts
719                                                 iter->remove();
720                                         }
721                                 }
722
723                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
724                                         transaction->resetServerFailure();
725
726                                         // Update which transactions parts still need to be sent
727                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
728
729                                         // Add the transaction status to the outstanding list
730                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
731
732                                         // Update the transaction status
733                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
734
735                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
736                                         if (transaction->didSendAllParts()) {
737                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
738                                                 pendingTransactionQueue->remove(transaction);
739                                         }
740                                 }
741                         } else {
742                                 // Reset which transaction to send
743                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
744                                         transaction->resetNextPartToSend();
745
746                                         // Set the transaction sequence number back to nothing
747                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
748                                                 transaction->setSequenceNumber(-1);
749                                         }
750                                 }
751                         }
752
753                         // Clear the sent data in preparation for next send
754                         pendingSendArbitrationEntriesToDelete->clear();
755                         transactionPartsSent->clear();
756
757                         if (sendSlotsReturn->getThird()->length() != 0) {
758                                 // insert into the local block chain
759                                 validateAndUpdate(sendSlotsReturn->getThird(), true);
760                         }
761                 }
762
763         } catch (ServerException *e) {
764
765                 if (e->getType() != ServerException->TypeInputTimeout) {
766                         // Nothing was able to be sent to the server so just clear these data structures
767                         for (Transaction *transaction : transactionPartsSent->keySet()) {
768                                 transaction->resetNextPartToSend();
769
770                                 // Set the transaction sequence number back to nothing
771                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772                                         transaction->setSequenceNumber(-1);
773                                 }
774                         }
775                 } else {
776                         // There was a partial send to the server
777                         hadPartialSendToServer = true;
778
779                         // Nothing was able to be sent to the server so just clear these data structures
780                         for (Transaction *transaction : transactionPartsSent->keySet()) {
781                                 transaction->resetNextPartToSend();
782                                 transaction->setServerFailure();
783                         }
784                 }
785
786                 pendingSendArbitrationEntriesToDelete->clear();
787                 transactionPartsSent->clear();
788
789                 throw e;
790         }
791
792         return newKey == NULL;
793 }
794
795 bool Table::updateFromLocal(int64_t machineId) {
796         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
797         if (localCommunicationInformation == NULL) {
798                 // Cant talk to that device locally so do nothing
799                 return false;
800         }
801
802         // Get the size of the send data
803         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
804
805         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
806         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
807                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
808         }
809
810         Array<char> *sendData = new Array<char>(sendDataSize);
811         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
812
813         // Encode the data
814         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
815         bbEncode->putInt(0);
816
817         // Send by local
818         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
819         localSequenceNumber++;
820
821         if (returnData == NULL) {
822                 // Could not contact server
823                 return false;
824         }
825
826         // Decode the data
827         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
828         int numberOfEntries = bbDecode->getInt();
829
830         for (int i = 0; i < numberOfEntries; i++) {
831                 char type = bbDecode->get();
832                 if (type == TypeAbort) {
833                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
834                         processEntry(abort);
835                 } else if (type == TypeCommitPart) {
836                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
837                         processEntry(commitPart);
838                 }
839         }
840
841         updateLiveStateFromLocal();
842
843         return true;
844 }
845
846 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
847
848         // Get the devices local communications
849         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
850
851         if (localCommunicationInformation == NULL) {
852                 // Cant talk to that device locally so do nothing
853                 return Pair<bool, bool>(true, false);
854         }
855
856         // Get the size of the send data
857         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
858         for (TransactionPart *part : transaction->getParts()->values()) {
859                 sendDataSize += part->getSize();
860         }
861
862         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
863         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
864                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
865         }
866
867         // Make the send data size
868         Array<char> *sendData = new Array<char>(sendDataSize);
869         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
870
871         // Encode the data
872         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
873         bbEncode->putInt(transaction->getParts()->size());
874         for (TransactionPart *part : transaction->getParts()->values()) {
875                 part->encode(bbEncode);
876         }
877
878
879         // Send by local
880         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
881         localSequenceNumber++;
882
883         if (returnData == NULL) {
884                 // Could not contact server
885                 return Pair<bool, bool>(true, false);
886         }
887
888         // Decode the data
889         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
890         bool didCommit = bbDecode->get() == 1;
891         bool couldArbitrate = bbDecode->get() == 1;
892         int numberOfEntries = bbDecode->getInt();
893         bool foundAbort = false;
894
895         for (int i = 0; i < numberOfEntries; i++) {
896                 char type = bbDecode->get();
897                 if (type == TypeAbort) {
898                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
899
900                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
901                                 foundAbort = true;
902                         }
903
904                         processEntry(abort);
905                 } else if (type == TypeCommitPart) {
906                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
907                         processEntry(commitPart);
908                 }
909         }
910
911         updateLiveStateFromLocal();
912
913         if (couldArbitrate) {
914                 TransactionStatus status =  transaction->getTransactionStatus();
915                 if (didCommit) {
916                         status->setStatus(TransactionStatus_StatusCommitted);
917                 } else {
918                         status->setStatus(TransactionStatus_StatusAborted);
919                 }
920         } else {
921                 TransactionStatus status =  transaction->getTransactionStatus();
922                 if (foundAbort) {
923                         status->setStatus(TransactionStatus_StatusAborted);
924                 } else {
925                         status->setStatus(TransactionStatus_StatusCommitted);
926                 }
927         }
928
929         return Pair<bool, bool>(false, true);
930 }
931
932 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
933
934         // Decode the data
935         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
936         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
937         int numberOfParts = bbDecode->getInt();
938
939         // If we did commit a transaction or not
940         bool didCommit = false;
941         bool couldArbitrate = false;
942
943         if (numberOfParts != 0) {
944
945                 // decode the transaction
946                 Transaction *transaction = new Transaction();
947                 for (int i = 0; i < numberOfParts; i++) {
948                         bbDecode->get();
949                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
950                         transaction->addPartDecode(newPart);
951                 }
952
953                 // Arbitrate on transaction and pull relevant return data
954                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
955                 couldArbitrate = localArbitrateReturn->getFirst();
956                 didCommit = localArbitrateReturn->getSecond();
957
958                 updateLiveStateFromLocal();
959
960                 // Transaction was sent to the server so keep track of it to prevent double commit
961                 if (transaction->getSequenceNumber() != -1) {
962                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
963                 }
964         }
965
966         // The data to send back
967         int returnDataSize = 0;
968         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
969
970         // Get the aborts to send back
971         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
972         Collections->sort(abortLocalSequenceNumbers);
973         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
974                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
975                         continue;
976                 }
977
978                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
979                 unseenArbitrations->add(abort);
980                 returnDataSize += abort->getSize();
981         }
982
983         // Get the commits to send back
984         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
985         if (commitForClientTable != NULL) {
986                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
987                 Collections->sort(commitLocalSequenceNumbers);
988
989                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
990                         Commit *commit = commitForClientTable->get(localSequenceNumber);
991
992                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
993                                 continue;
994                         }
995
996                         unseenArbitrations->addAll(commit->getParts()->values());
997
998                         for (CommitPart commitPart : commit->getParts()->values()) {
999                                 returnDataSize += commitPart->getSize();
1000                         }
1001                 }
1002         }
1003
1004         // Number of arbitration entries to decode
1005         returnDataSize += 2 * sizeof(int32_t);
1006
1007         // bool of did commit or not
1008         if (numberOfParts != 0) {
1009                 returnDataSize += sizeof(char);
1010         }
1011
1012         // Data to send Back
1013         Array<char> *returnData = new Array<char>(returnDataSize);
1014         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1015
1016         if (numberOfParts != 0) {
1017                 if (didCommit) {
1018                         bbEncode->put((char)1);
1019                 } else {
1020                         bbEncode->put((char)0);
1021                 }
1022                 if (couldArbitrate) {
1023                         bbEncode->put((char)1);
1024                 } else {
1025                         bbEncode->put((char)0);
1026                 }
1027         }
1028
1029         bbEncode->putInt(unseenArbitrations->size());
1030         for (Entry *entry : unseenArbitrations) {
1031                 entry->encode(bbEncode);
1032         }
1033
1034
1035         localSequenceNumber++;
1036         return returnData;
1037 }
1038
1039 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1040         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1041         attemptedToSendToServer = true;
1042
1043         bool inserted = false;
1044         bool lastTryInserted = false;
1045
1046         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1047         if (array == NULL) {
1048                 array = new Array<Slot *>();
1049                 array->set(0, slot);
1050                 rejectedSlotVector->clear();
1051                 inserted = true;
1052         } else {
1053                 if (array->length() == 0) {
1054                         throw new Error("Server Error: Did not send any slots");
1055                 }
1056
1057                 // if (attemptedToSendToServerTmp) {
1058                 if (hadPartialSendToServer) {
1059
1060                         bool isInserted = false;
1061                         for (Slot *s : array) {
1062                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1063                                         isInserted = true;
1064                                         break;
1065                                 }
1066                         }
1067
1068                         for (Slot *s : array) {
1069                                 if (isInserted) {
1070                                         break;
1071                                 }
1072
1073                                 // Process each entry in the slot
1074                                 for (Entry *entry : s->getEntries()) {
1075
1076                                         if (entry->getType() == TypeLastMessage) {
1077                                                 LastMessage *lastMessage = (LastMessage *)entry;
1078
1079                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1080                                                         isInserted = true;
1081                                                         break;
1082                                                 }
1083                                         }
1084                                 }
1085                         }
1086
1087                         if (!isInserted) {
1088                                 rejectedSlotVector->add(slot->getSequenceNumber());
1089                                 lastTryInserted = false;
1090                         } else {
1091                                 lastTryInserted = true;
1092                         }
1093                 } else {
1094                         rejectedSlotVector->add(slot->getSequenceNumber());
1095                         lastTryInserted = false;
1096                 }
1097         }
1098
1099         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1100 }
1101
1102 /**
1103  * Returns false if a resize was needed
1104  */
1105 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1106         int newSize = 0;
1107         if (liveSlotCount > bufferResizeThreshold) {
1108                 resize = true;  //Resize is forced
1109         }
1110
1111         if (resize) {
1112                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1113                 TableStatus *status = new TableStatus(slot, newSize);
1114                 slot->addEntry(status);
1115         }
1116
1117         // Fill with rejected slots first before doing anything else
1118         doRejectedMessages(slot);
1119
1120         // Do mandatory rescue of entries
1121         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1122
1123         // Extract working variables
1124         bool needsResize = mandatoryRescueReturn->getFirst();
1125         bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1126         int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1127
1128         if (needsResize && !resize) {
1129                 // We need to resize but we are not resizing so return false
1130                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1131         }
1132
1133         bool inserted = false;
1134         if (newKeyEntry != NULL) {
1135                 newKeyEntry->setSlot(slot);
1136                 if (slot->hasSpace(newKeyEntry)) {
1137
1138                         slot->addEntry(newKeyEntry);
1139                         inserted = true;
1140                 }
1141         }
1142
1143         // Clear the transactions, aborts and commits that were sent previously
1144         transactionPartsSent->clear();
1145         pendingSendArbitrationEntriesToDelete->clear();
1146
1147         for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1148                 bool isFull = false;
1149                 round->generateParts();
1150                 Vector<Entry *> *parts = round->getParts();
1151
1152                 // Insert pending arbitration data
1153                 for (Entry *arbitrationData : parts) {
1154
1155                         // If it is an abort then we need to set some information
1156                         if (arbitrationData instanceof Abort) {
1157                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1158                         }
1159
1160                         if (!slot->hasSpace(arbitrationData)) {
1161                                 // No space so cant do anything else with these data entries
1162                                 isFull = true;
1163                                 break;
1164                         }
1165
1166                         // Add to this current slot and add it to entries to delete
1167                         slot->addEntry(arbitrationData);
1168                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1169                 }
1170
1171                 if (isFull) {
1172                         break;
1173                 }
1174         }
1175
1176         if (pendingTransactionQueue->size() > 0) {
1177                 Transaction *transaction = pendingTransactionQueue->get(0);
1178                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1179                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1180                         transaction->setSequenceNumber(slot->getSequenceNumber());
1181                 }
1182
1183                 while (true) {
1184                         TransactionPart *part = transaction->getNextPartToSend();
1185                         if (part == NULL) {
1186                                 // Ran out of parts to send for this transaction so move on
1187                                 break;
1188                         }
1189
1190                         if (slot->hasSpace(part)) {
1191                                 slot->addEntry(part);
1192                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1193                                 if (partsSent == NULL) {
1194                                         partsSent = new Vector<int32_t>();
1195                                         transactionPartsSent->put(transaction, partsSent);
1196                                 }
1197                                 partsSent->add(part->getPartNumber());
1198                                 transactionPartsSent->put(transaction, partsSent);
1199                         } else {
1200                                 break;
1201                         }
1202                 }
1203         }
1204
1205         // Fill the remainder of the slot with rescue data
1206         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1207
1208         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1209 }
1210
1211 void Table::doRejectedMessages(Slot *s) {
1212         if (!rejectedSlotVector->isEmpty()) {
1213                 /* TODO: We should avoid generating a rejected message entry if
1214                  * there is already a sufficient entry in the queue (e->g->,
1215                  * equalsto value of true and same sequence number)->  */
1216
1217                 int64_t old_seqn = rejectedSlotVector->firstElement();
1218                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1219                         int64_t new_seqn = rejectedSlotVector->lastElement();
1220                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1221                         s->addEntry(rm);
1222                 } else {
1223                         int64_t prev_seqn = -1;
1224                         int i = 0;
1225                         /* Go through list of missing messages */
1226                         for (; i < rejectedSlotVector->size(); i++) {
1227                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1228                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1229                                 if (s_msg != NULL)
1230                                         break;
1231                                 prev_seqn = curr_seqn;
1232                         }
1233                         /* Generate rejected message entry for missing messages */
1234                         if (prev_seqn != -1) {
1235                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1236                                 s->addEntry(rm);
1237                         }
1238                         /* Generate rejected message entries for present messages */
1239                         for (; i < rejectedSlotVector->size(); i++) {
1240                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1241                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1242                                 int64_t machineid = s_msg->getMachineID();
1243                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1244                                 s->addEntry(rm);
1245                         }
1246                 }
1247         }
1248 }
1249
1250 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1251         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1252         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1253         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1254                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1255         }
1256
1257         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1258         bool seenLiveSlot = false;
1259         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1260         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1261
1262
1263         // Mandatory Rescue
1264         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1265                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1266                 // Push slot number forward
1267                 if (!seenLiveSlot) {
1268                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1269                 }
1270
1271                 if (!previousSlot->isLive()) {
1272                         continue;
1273                 }
1274
1275                 // We have seen a live slot
1276                 seenLiveSlot = true;
1277
1278                 // Get all the live entries for a slot
1279                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1280
1281                 // Iterate over all the live entries and try to rescue them
1282                 for (Entry *liveEntry : liveEntries) {
1283                         if (slot->hasSpace(liveEntry)) {
1284
1285                                 // Enough space to rescue the entry
1286                                 slot->addEntry(liveEntry);
1287                         } else if (currentSequenceNumber == firstIfFull) {
1288                                 //if there's no space but the entry is about to fall off the queue
1289                                 System->out->println("B");      //?
1290                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1291
1292                         }
1293                 }
1294         }
1295
1296         // Did not resize
1297         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1298 }
1299
1300 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1301         /* now go through live entries from least to greatest sequence number until
1302          * either all live slots added, or the slot doesn't have enough room
1303          * for SKIP_THRESHOLD consecutive entries*/
1304         int skipcount = 0;
1305         int64_t newestseqnum = buffer->getNewestSeqNum();
1306 search:
1307         for (; seqn <= newestseqnum; seqn++) {
1308                 Slot *prevslot = buffer->getSlot(seqn);
1309                 //Push slot number forward
1310                 if (!seenliveslot)
1311                         oldestLiveSlotSequenceNumver = seqn;
1312
1313                 if (!prevslot->isLive())
1314                         continue;
1315                 seenliveslot = true;
1316                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1317                 for (Entry *liveentry : liveentries) {
1318                         if (s->hasSpace(liveentry))
1319                                 s->addEntry(liveentry);
1320                         else {
1321                                 skipcount++;
1322                                 if (skipcount > Table_SKIP_THRESHOLD)
1323                                         break search;
1324                         }
1325                 }
1326         }
1327 }
1328
1329 /**
1330  * Checks for malicious activity and updates the local copy of the block chain->
1331  */
1332 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1333
1334         // The cloud communication layer has checked slot HMACs already
1335         // before decoding
1336         if (newSlots->length() == 0) {
1337                 return;
1338         }
1339
1340         // Make sure all slots are newer than the last largest slot this
1341         // client has seen
1342         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1343         if (firstSeqNum <= sequenceNumber) {
1344                 throw new Error("Server Error: Sent older slots!");
1345         }
1346
1347         // Create an object that can access both new slots and slots in our
1348         // local chain without committing slots to our local chain
1349         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1350
1351         // Check that the HMAC chain is not broken
1352         checkHMACChain(indexer, newSlots);
1353
1354         // Set to keep track of messages from clients
1355         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1356
1357         // Process each slots data
1358         for (Slot *slot : newSlots) {
1359                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1360
1361                 updateExpectedSize();
1362         }
1363
1364         // If there is a gap, check to see if the server sent us
1365         // everything->
1366         if (firstSeqNum != (sequenceNumber + 1)) {
1367
1368                 // Check the size of the slots that were sent down by the server->
1369                 // Can only check the size if there was a gap
1370                 checkNumSlots(newSlots->length);
1371
1372                 // Since there was a gap every machine must have pushed a slot or
1373                 // must have a last message message-> If not then the server is
1374                 // hiding slots
1375                 if (!machineSet->isEmpty()) {
1376                         throw new Error("Missing record for machines: " + machineSet);
1377                 }
1378         }
1379
1380         // Update the size of our local block chain->
1381         commitNewMaxSize();
1382
1383         // Commit new to slots to the local block chain->
1384         for (Slot *slot : newSlots) {
1385
1386                 // Insert this slot into our local block chain copy->
1387                 buffer->putSlot(slot);
1388
1389                 // Keep track of how many slots are currently live (have live data
1390                 // in them)->
1391                 liveSlotCount++;
1392         }
1393
1394         // Get the sequence number of the latest slot in the system
1395         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1396         updateLiveStateFromServer();
1397
1398         // No Need to remember after we pulled from the server
1399         offlineTransactionsCommittedAndAtServer->clear();
1400
1401         // This is invalidated now
1402         hadPartialSendToServer = false;
1403 }
1404
1405 void Table::updateLiveStateFromServer() {
1406         // Process the new transaction parts
1407         processNewTransactionParts();
1408
1409         // Do arbitration on new transactions that were received
1410         arbitrateFromServer();
1411
1412         // Update all the committed keys
1413         bool didCommitOrSpeculate = updateCommittedTable();
1414
1415         // Delete the transactions that are now dead
1416         updateLiveTransactionsAndStatus();
1417
1418         // Do speculations
1419         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1420         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1421 }
1422
1423 void Table::updateLiveStateFromLocal() {
1424         // Update all the committed keys
1425         bool didCommitOrSpeculate = updateCommittedTable();
1426
1427         // Delete the transactions that are now dead
1428         updateLiveTransactionsAndStatus();
1429
1430         // Do speculations
1431         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1432         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1433 }
1434
1435 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1436         int64_t prevslots = firstSequenceNumber;
1437
1438         if (didFindTableStatus) {
1439         } else {
1440                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1441         }
1442
1443         didFindTableStatus = true;
1444         currMaxSize = numberOfSlots;
1445 }
1446
1447 void Table::updateExpectedSize() {
1448         expectedsize++;
1449
1450         if (expectedsize > currMaxSize) {
1451                 expectedsize = currMaxSize;
1452         }
1453 }
1454
1455
1456 /**
1457  * Check the size of the block chain to make sure there are enough
1458  * slots sent back by the server-> This is only called when we have a
1459  * gap between the slots that we have locally and the slots sent by
1460  * the server therefore in the slots sent by the server there will be
1461  * at least 1 Table status message
1462  */
1463 void Table::checkNumSlots(int numberOfSlots) {
1464         if (numberOfSlots != expectedsize) {
1465                 throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
1466         }
1467 }
1468
1469 void Table::updateCurrMaxSize(int newmaxsize) {
1470         currMaxSize = newmaxsize;
1471 }
1472
1473
1474 /**
1475  * Update the size of of the local buffer if it is needed->
1476  */
1477 void Table::commitNewMaxSize() {
1478         didFindTableStatus = false;
1479
1480         // Resize the local slot buffer
1481         if (numberOfSlots != currMaxSize) {
1482                 buffer->resize((int32_t)currMaxSize);
1483         }
1484
1485         // Change the number of local slots to the new size
1486         numberOfSlots = (int32_t)currMaxSize;
1487
1488         // Recalculate the resize threshold since the size of the local
1489         // buffer has changed
1490         setResizeThreshold();
1491 }
1492
1493 /**
1494  * Process the new transaction parts from this latest round of slots
1495  * received from the server
1496  */
1497 void Table::processNewTransactionParts() {
1498
1499         if (newTransactionParts->size() == 0) {
1500                 // Nothing new to process
1501                 return;
1502         }
1503
1504         // Iterate through all the machine Ids that we received new parts
1505         // for
1506         for (int64_t machineId : newTransactionParts->keySet()) {
1507                 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1508
1509                 // Iterate through all the parts for that machine Id
1510                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1511                         TransactionPart *part = parts->get(partId);
1512
1513                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1514                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1515                                 // Set dead the transaction part
1516                                 part->setDead();
1517                                 continue;
1518                         }
1519
1520                         // Get the transaction object for that sequence number
1521                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1522
1523                         if (transaction == NULL) {
1524                                 // This is a new transaction that we dont have so make a new one
1525                                 transaction = new Transaction();
1526
1527                                 // Insert this new transaction into the live tables
1528                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1529                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1530                         }
1531
1532                         // Add that part to the transaction
1533                         transaction->addPartDecode(part);
1534                 }
1535         }
1536
1537         // Clear all the new transaction parts in preparation for the next
1538         // time the server sends slots
1539         newTransactionParts->clear();
1540 }
1541
1542 void Table::arbitrateFromServer() {
1543
1544         if (liveTransactionBySequenceNumberTable->size() == 0) {
1545                 // Nothing to arbitrate on so move on
1546                 return;
1547         }
1548
1549         // Get the transaction sequence numbers and sort from oldest to newest
1550         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1551         Collections->sort(transactionSequenceNumbers);
1552
1553         // Collection of key value pairs that are
1554         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1555
1556         // The last transaction arbitrated on
1557         int64_t lastTransactionCommitted = -1;
1558         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1559
1560         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1561                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1562
1563
1564
1565                 // Check if this machine arbitrates for this transaction if not
1566                 // then we cant arbitrate this transaction
1567                 if (transaction->getArbitrator() != localMachineId) {
1568                         continue;
1569                 }
1570
1571                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1572                         continue;
1573                 }
1574
1575                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1576                         // We have seen this already locally so dont commit again
1577                         continue;
1578                 }
1579
1580
1581                 if (!transaction->isComplete()) {
1582                         // Will arbitrate in incorrect order if we continue so just break
1583                         // Most likely this
1584                         break;
1585                 }
1586
1587
1588                 // update the largest transaction seen by arbitrator from server
1589                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1590                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1591                 } else {
1592                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1593                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1594                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1595                         }
1596                 }
1597
1598                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1599                         // Guard evaluated as true
1600
1601                         // Update the local changes so we can make the commit
1602                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1603                                 speculativeTableTmp->put(kv->getKey(), kv);
1604                         }
1605
1606                         // Update what the last transaction committed was for use in batch commit
1607                         lastTransactionCommitted = transactionSequenceNumber;
1608                 } else {
1609                         // Guard evaluated was false so create abort
1610                         // Create the abort
1611                         Abort *newAbort = new Abort(NULL,
1612                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1613                                                                                                                                         transaction->getSequenceNumber(),
1614                                                                                                                                         transaction->getMachineId(),
1615                                                                                                                                         transaction->getArbitrator(),
1616                                                                                                                                         localArbitrationSequenceNumber);
1617                         localArbitrationSequenceNumber++;
1618                         generatedAborts->add(newAbort);
1619
1620                         // Insert the abort so we can process
1621                         processEntry(newAbort);
1622                 }
1623
1624                 lastSeqNumArbOn = transactionSequenceNumber;
1625         }
1626
1627         Commit *newCommit = NULL;
1628
1629         // If there is something to commit
1630         if (speculativeTableTmp->size() != 0) {
1631                 // Create the commit and increment the commit sequence number
1632                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1633                 localArbitrationSequenceNumber++;
1634
1635                 // Add all the new keys to the commit
1636                 for (KeyValue *kv : speculativeTableTmp->values()) {
1637                         newCommit->addKV(kv);
1638                 }
1639
1640                 // create the commit parts
1641                 newCommit->createCommitParts();
1642
1643                 // Append all the commit parts to the end of the pending queue
1644                 // waiting for sending to the server
1645                 // Insert the commit so we can process it
1646                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1647                         processEntry(commitPart);
1648                 }
1649         }
1650
1651         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1652                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1653                 pendingSendArbitrationRounds->add(arbitrationRound);
1654
1655                 if (compactArbitrationData()) {
1656                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1657                         if (newArbitrationRound->getCommit() != NULL) {
1658                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1659                                         processEntry(commitPart);
1660                                 }
1661                         }
1662                 }
1663         }
1664 }
1665
1666 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1667
1668         // Check if this machine arbitrates for this transaction if not then
1669         // we cant arbitrate this transaction
1670         if (transaction->getArbitrator() != localMachineId) {
1671                 return Pair<bool, bool>(false, false);
1672         }
1673
1674         if (!transaction->isComplete()) {
1675                 // Will arbitrate in incorrect order if we continue so just break
1676                 // Most likely this
1677                 return Pair<bool, bool>(false, false);
1678         }
1679
1680         if (transaction->getMachineId() != localMachineId) {
1681                 // dont do this check for local transactions
1682                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1683                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1684                                 // We've have already seen this from the server
1685                                 return Pair<bool, bool>(false, false);
1686                         }
1687                 }
1688         }
1689
1690         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1691                 // Guard evaluated as true Create the commit and increment the
1692                 // commit sequence number
1693                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1694                 localArbitrationSequenceNumber++;
1695
1696                 // Update the local changes so we can make the commit
1697                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1698                         newCommit->addKV(kv);
1699                 }
1700
1701                 // create the commit parts
1702                 newCommit->createCommitParts();
1703
1704                 // Append all the commit parts to the end of the pending queue
1705                 // waiting for sending to the server
1706                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1707                 pendingSendArbitrationRounds->add(arbitrationRound);
1708
1709                 if (compactArbitrationData()) {
1710                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1711                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1712                                 processEntry(commitPart);
1713                         }
1714                 } else {
1715                         // Insert the commit so we can process it
1716                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1717                                 processEntry(commitPart);
1718                         }
1719                 }
1720
1721                 if (transaction->getMachineId() == localMachineId) {
1722                         TransactionStatus *status = transaction->getTransactionStatus();
1723                         if (status != NULL) {
1724                                 status->setStatus(TransactionStatus_StatusCommitted);
1725                         }
1726                 }
1727
1728                 updateLiveStateFromLocal();
1729                 return Pair<bool, bool>(true, true);
1730         } else {
1731                 if (transaction->getMachineId() == localMachineId) {
1732                         // For locally created messages update the status
1733                         // Guard evaluated was false so create abort
1734                         TransactionStatus status = transaction->getTransactionStatus();
1735                         if (status != NULL) {
1736                                 status->setStatus(TransactionStatus_StatusAborted);
1737                         }
1738                 } else {
1739                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1740
1741                         // Create the abort
1742                         Abort *newAbort = new Abort(NULL,
1743                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1744                                                                                                                                         -1,
1745                                                                                                                                         transaction->getMachineId(),
1746                                                                                                                                         transaction->getArbitrator(),
1747                                                                                                                                         localArbitrationSequenceNumber);
1748                         localArbitrationSequenceNumber++;
1749                         addAbortSet->add(newAbort);
1750
1751                         // Append all the commit parts to the end of the pending queue
1752                         // waiting for sending to the server
1753                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1754                         pendingSendArbitrationRounds->add(arbitrationRound);
1755
1756                         if (compactArbitrationData()) {
1757                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1758                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1759                                         processEntry(commitPart);
1760                                 }
1761                         }
1762                 }
1763
1764                 updateLiveStateFromLocal();
1765                 return Pair<bool, bool>(true, false);
1766         }
1767 }
1768
1769 /**
1770  * Compacts the arbitration data my merging commits and aggregating
1771  * aborts so that a single large push of commits can be done instead
1772  * of many small updates
1773  */
1774 bool Table::compactArbitrationData() {
1775         if (pendingSendArbitrationRounds->size() < 2) {
1776                 // Nothing to compact so do nothing
1777                 return false;
1778         }
1779
1780         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1781         if (lastRound->didSendPart()) {
1782                 return false;
1783         }
1784
1785         bool hadCommit = (lastRound->getCommit() == NULL);
1786         bool gotNewCommit = false;
1787
1788         int numberToDelete = 1;
1789         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1790                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1791
1792                 if (round->isFull() || round->didSendPart()) {
1793                         // Stop since there is a part that cannot be compacted and we
1794                         // need to compact in order
1795                         break;
1796                 }
1797
1798                 if (round->getCommit() == NULL) {
1799                         // Try compacting aborts only
1800                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1801                         if (newSize > ArbitrationRound->MAX_PARTS) {
1802                                 // Cant compact since it would be too large
1803                                 break;
1804                         }
1805                         lastRound->addAborts(round->getAborts());
1806                 } else {
1807                         // Create a new larger commit
1808                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1809                         localArbitrationSequenceNumber++;
1810
1811                         // Create the commit parts so that we can count them
1812                         newCommit->createCommitParts();
1813
1814                         // Calculate the new size of the parts
1815                         int newSize = newCommit->getNumberOfParts();
1816                         newSize += lastRound->getAbortsCount();
1817                         newSize += round->getAbortsCount();
1818
1819                         if (newSize > ArbitrationRound->MAX_PARTS) {
1820                                 // Cant compact since it would be too large
1821                                 break;
1822                         }
1823
1824                         // Set the new compacted part
1825                         lastRound->setCommit(newCommit);
1826                         lastRound->addAborts(round->getAborts());
1827                         gotNewCommit = true;
1828                 }
1829
1830                 numberToDelete++;
1831         }
1832
1833         if (numberToDelete != 1) {
1834                 // If there is a compaction
1835                 // Delete the previous pieces that are now in the new compacted piece
1836                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1837                         pendingSendArbitrationRounds->clear();
1838                 } else {
1839                         for (int i = 0; i < numberToDelete; i++) {
1840                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1841                         }
1842                 }
1843
1844                 // Add the new compacted into the pending to send list
1845                 pendingSendArbitrationRounds->add(lastRound);
1846
1847                 // Should reinsert into the commit processor
1848                 if (hadCommit && gotNewCommit) {
1849                         return true;
1850                 }
1851         }
1852
1853         return false;
1854 }
1855
1856 /**
1857  * Update all the commits and the committed tables, sets dead the dead
1858  * transactions
1859  */
1860 bool Table::updateCommittedTable() {
1861
1862         if (newCommitParts->size() == 0) {
1863                 // Nothing new to process
1864                 return false;
1865         }
1866
1867         // Iterate through all the machine Ids that we received new parts for
1868         for (int64_t machineId : newCommitParts->keySet()) {
1869                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1870
1871                 // Iterate through all the parts for that machine Id
1872                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1873                         CommitPart *part = parts->get(partId);
1874
1875                         // Get the transaction object for that sequence number
1876                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1877
1878                         if (commitForClientTable == NULL) {
1879                                 // This is the first commit from this device
1880                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1881                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1882                         }
1883
1884                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1885
1886                         if (commit == NULL) {
1887                                 // This is a new commit that we dont have so make a new one
1888                                 commit = new Commit();
1889
1890                                 // Insert this new commit into the live tables
1891                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1892                         }
1893
1894                         // Add that part to the commit
1895                         commit->addPartDecode(part);
1896                 }
1897         }
1898
1899         // Clear all the new commits parts in preparation for the next time
1900         // the server sends slots
1901         newCommitParts->clear();
1902
1903         // If we process a new commit keep track of it for future use
1904         bool didProcessANewCommit = false;
1905
1906         // Process the commits one by one
1907         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1908
1909                 // Get all the commits for a specific arbitrator
1910                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1911
1912                 // Sort the commits in order
1913                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1914                 Collections->sort(commitSequenceNumbers);
1915
1916                 // Get the last commit seen from this arbitrator
1917                 int64_t lastCommitSeenSequenceNumber = -1;
1918                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1919                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1920                 }
1921
1922                 // Go through each new commit one by one
1923                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1924                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1925                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1926
1927                         // Special processing if a commit is not complete
1928                         if (!commit->isComplete()) {
1929                                 if (i == (commitSequenceNumbers->size() - 1)) {
1930                                         // If there is an incomplete commit and this commit is the
1931                                         // latest one seen then this commit cannot be processed and
1932                                         // there are no other commits
1933                                         break;
1934                                 } else {
1935                                         // This is a commit that was already dead but parts of it
1936                                         // are still in the block chain (not flushed out yet)->
1937                                         // Delete it and move on
1938                                         commit->setDead();
1939                                         commitForClientTable->remove(commit->getSequenceNumber());
1940                                         continue;
1941                                 }
1942                         }
1943
1944                         // Update the last transaction that was updated if we can
1945                         if (commit->getTransactionSequenceNumber() != -1) {
1946                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1947
1948                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1949                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1950                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1951                                 }
1952                         }
1953
1954                         // Update the last arbitration data that we have seen so far
1955                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1956
1957                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1958                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1959                                         // Is larger
1960                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1961                                 }
1962                         } else {
1963                                 // Never seen any data from this arbitrator so record the first one
1964                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1965                         }
1966
1967                         // We have already seen this commit before so need to do the
1968                         // full processing on this commit
1969                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1970
1971                                 // Update the last transaction that was updated if we can
1972                                 if (commit->getTransactionSequenceNumber() != -1) {
1973                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1974
1975                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1976                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1977                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1978                                         }
1979                                 }
1980
1981                                 continue;
1982                         }
1983
1984                         // If we got here then this is a brand new commit and needs full
1985                         // processing
1986                         // Get what commits should be edited, these are the commits that
1987                         // have live values for their keys
1988                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1989                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1990                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1991                         }
1992                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
1993
1994                         // Update each previous commit that needs to be updated
1995                         for (Commit *previousCommit : commitsToEdit) {
1996
1997                                 // Only bother with live commits (TODO: Maybe remove this check)
1998                                 if (previousCommit->isLive()) {
1999
2000                                         // Update which keys in the old commits are still live
2001                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2002                                                 previousCommit->invalidateKey(kv->getKey());
2003                                         }
2004
2005                                         // if the commit is now dead then remove it
2006                                         if (!previousCommit->isLive()) {
2007                                                 commitForClientTable->remove(previousCommit);
2008                                         }
2009                                 }
2010                         }
2011
2012                         // Update the last seen sequence number from this arbitrator
2013                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2014                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2015                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2016                                 }
2017                         } else {
2018                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2019                         }
2020
2021                         // We processed a new commit that we havent seen before
2022                         didProcessANewCommit = true;
2023
2024                         // Update the committed table of keys and which commit is using which key
2025                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2026                                 committedKeyValueTable->put(kv->getKey(), kv);
2027                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2028                         }
2029                 }
2030         }
2031
2032         return didProcessANewCommit;
2033 }
2034
2035 /**
2036  * Create the speculative table from transactions that are still live
2037  * and have come from the cloud
2038  */
2039 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2040         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2041                 // There is nothing to speculate on
2042                 return false;
2043         }
2044
2045         // Create a list of the transaction sequence numbers and sort them
2046         // from oldest to newest
2047         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2048         Collections->sort(transactionSequenceNumbersSorted);
2049
2050         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2051
2052
2053         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2054                 // If there is a gap in the transaction sequence numbers then
2055                 // there was a commit or an abort of a transaction OR there was a
2056                 // new commit (Could be from offline commit) so a redo the
2057                 // speculation from scratch
2058
2059                 // Start from scratch
2060                 speculatedKeyValueTable->clear();
2061                 lastTransactionSequenceNumberSpeculatedOn = -1;
2062                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2063
2064         }
2065
2066         // Remember the front of the transaction list
2067         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2068
2069         // Find where to start arbitration from
2070         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2071
2072         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2073                 // Make sure we are not out of bounds
2074                 return false;           // did not speculate
2075         }
2076
2077         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2078         bool didSkip = true;
2079
2080         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2081                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2082                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2083
2084                 if (!transaction->isComplete()) {
2085                         // If there is an incomplete transaction then there is nothing
2086                         // we can do add this transactions arbitrator to the list of
2087                         // arbitrators we should ignore
2088                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2089                         didSkip = true;
2090                         continue;
2091                 }
2092
2093                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2094                         continue;
2095                 }
2096
2097                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2098
2099                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2100                         // Guard evaluated to true so update the speculative table
2101                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2102                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2103                         }
2104                 }
2105         }
2106
2107         if (didSkip) {
2108                 // Since there was a skip we need to redo the speculation next time around
2109                 lastTransactionSequenceNumberSpeculatedOn = -1;
2110                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2111         }
2112
2113         // We did some speculation
2114         return true;
2115 }
2116
2117 /**
2118  * Create the pending transaction speculative table from transactions
2119  * that are still in the pending transaction buffer
2120  */
2121 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2122         if (pendingTransactionQueue->size() == 0) {
2123                 // There is nothing to speculate on
2124                 return;
2125         }
2126
2127         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2128                 // need to reset on the pending speculation
2129                 lastPendingTransactionSpeculatedOn = NULL;
2130                 firstPendingTransaction = pendingTransactionQueue->get(0);
2131                 pendingTransactionSpeculatedKeyValueTable->clear();
2132         }
2133
2134         // Find where to start arbitration from
2135         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2136
2137         if (startIndex >= pendingTransactionQueue->size()) {
2138                 // Make sure we are not out of bounds
2139                 return;
2140         }
2141
2142         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2143                 Transaction *transaction = pendingTransactionQueue->get(i);
2144
2145                 lastPendingTransactionSpeculatedOn = transaction;
2146
2147                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2148                         // Guard evaluated to true so update the speculative table
2149                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2150                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2151                         }
2152                 }
2153         }
2154 }
2155
2156 /**
2157  * Set dead and remove from the live transaction tables the
2158  * transactions that are dead
2159  */
2160 void Table::updateLiveTransactionsAndStatus() {
2161
2162         // Go through each of the transactions
2163         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2164                 Transaction *transaction = iter->next()->getValue();
2165
2166                 // Check if the transaction is dead
2167                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2168                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2169
2170                         // Set dead the transaction
2171                         transaction->setDead();
2172
2173                         // Remove the transaction from the live table
2174                         iter->remove();
2175                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2176                 }
2177         }
2178
2179         // Go through each of the transactions
2180         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2181                 TransactionStatus *status = iter->next()->getValue();
2182
2183                 // Check if the transaction is dead
2184                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2185                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2186
2187                         // Set committed
2188                         status->setStatus(TransactionStatus_StatusCommitted);
2189
2190                         // Remove
2191                         iter->remove();
2192                 }
2193         }
2194 }
2195
2196 /**
2197  * Process this slot, entry by entry->  Also update the latest message sent by slot
2198  */
2199 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2200
2201         // Update the last message seen
2202         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2203
2204         // Process each entry in the slot
2205         for (Entry *entry : slot->getEntries()) {
2206                 switch (entry->getType()) {
2207                 case TypeCommitPart:
2208                         processEntry((CommitPart *)entry);
2209                         break;
2210                 case TypeAbort:
2211                         processEntry((Abort *)entry);
2212                         break;
2213                 case TypeTransactionPart:
2214                         processEntry((TransactionPart *)entry);
2215                         break;
2216                 case TypeNewKey:
2217                         processEntry((NewKey *)entry);
2218                         break;
2219                 case TypeLastMessage:
2220                         processEntry((LastMessage *)entry, machineSet);
2221                         break;
2222                 case TypeRejectedMessage:
2223                         processEntry((RejectedMessage *)entry, indexer);
2224                         break;
2225                 case TypeTableStatus:
2226                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2227                         break;
2228                 default:
2229                         throw new Error("Unrecognized type: " + entry->getType());
2230                 }
2231         }
2232 }
2233
2234 /**
2235  * Update the last message that was sent for a machine Id
2236  */
2237 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2238         // Update what the last message received by a machine was
2239         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2240 }
2241
2242 /**
2243  * Add the new key to the arbitrators table and update the set of live
2244  * new keys (in case of a rescued new key message)
2245  */
2246 void Table::processEntry(NewKey *entry) {
2247         // Update the arbitrator table with the new key information
2248         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2249
2250         // Update what the latest live new key is
2251         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2252         if (oldNewKey != NULL) {
2253                 // Delete the old new key messages
2254                 oldNewKey->setDead();
2255         }
2256 }
2257
2258 /**
2259  * Process new table status entries and set dead the old ones as new
2260  * ones come in-> keeps track of the largest and smallest table status
2261  * seen in this current round of updating the local copy of the block
2262  * chain
2263  */
2264 void Table::processEntry(TableStatus entry, int64_t seq) {
2265         int newNumSlots = entry->getMaxSlots();
2266         updateCurrMaxSize(newNumSlots);
2267         initExpectedSize(seq, newNumSlots);
2268
2269         if (liveTableStatus != NULL) {
2270                 // We have a larger table status so the old table status is no
2271                 // int64_ter alive
2272                 liveTableStatus->setDead();
2273         }
2274
2275         // Make this new table status the latest alive table status
2276         liveTableStatus = entry;
2277 }
2278
2279 /**
2280  * Check old messages to see if there is a block chain violation->
2281  * Also
2282  */
2283 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2284         int64_t oldSeqNum = entry->getOldSeqNum();
2285         int64_t newSeqNum = entry->getNewSeqNum();
2286         bool isequal = entry->getEqual();
2287         int64_t machineId = entry->getMachineID();
2288         int64_t seq = entry->getSequenceNumber();
2289
2290         // Check if we have messages that were supposed to be rejected in
2291         // our local block chain
2292         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2293                 // Get the slot
2294                 Slot *slot = indexer->getSlot(seqNum);
2295
2296                 if (slot != NULL) {
2297                         // If we have this slot make sure that it was not supposed to be
2298                         // a rejected slot
2299                         int64_t slotMachineId = slot->getMachineID();
2300                         if (isequal != (slotMachineId == machineId)) {
2301                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2302                         }
2303                 }
2304         }
2305
2306         // Create a list of clients to watch until they see this rejected
2307         // message entry->
2308         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2309         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2310                 // Machine ID for the last message entry
2311                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2312
2313                 // We've seen it, don't need to continue to watch->  Our next
2314                 // message will implicitly acknowledge it->
2315                 if (lastMessageEntryMachineId == localMachineId) {
2316                         continue;
2317                 }
2318
2319                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2320                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2321
2322                 if (entrySequenceNumber < seq) {
2323                         // Add this rejected message to the set of messages that this
2324                         // machine ID did not see yet
2325                         addWatchVector(lastMessageEntryMachineId, entry);
2326                         // This client did not see this rejected message yet so add it
2327                         // to the watch set to monitor
2328                         deviceWatchSet->add(lastMessageEntryMachineId);
2329                 }
2330         }
2331         if (deviceWatchSet->isEmpty()) {
2332                 // This rejected message has been seen by all the clients so
2333                 entry->setDead();
2334         } else {
2335                 // We need to watch this rejected message
2336                 entry->setWatchSet(deviceWatchSet);
2337         }
2338 }
2339
2340 /**
2341  * Check if this abort is live, if not then save it so we can kill it
2342  * later-> update the last transaction number that was arbitrated on->
2343  */
2344 void Table::processEntry(Abort *entry) {
2345         if (entry->getTransactionSequenceNumber() != -1) {
2346                 // update the transaction status if it was sent to the server
2347                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2348                 if (status != NULL) {
2349                         status->setStatus(TransactionStatus_StatusAborted);
2350                 }
2351         }
2352
2353         // Abort has not been seen by the client it is for yet so we need to
2354         // keep track of it
2355         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2356         if (previouslySeenAbort != NULL) {
2357                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2358         }
2359
2360         if (entry->getTransactionArbitrator() == localMachineId) {
2361                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2362         }
2363
2364         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2365                 // The machine already saw this so it is dead
2366                 entry->setDead();
2367                 liveAbortTable->remove(entry->getAbortId());
2368
2369                 if (entry->getTransactionArbitrator() == localMachineId) {
2370                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2371                 }
2372                 return;
2373         }
2374
2375         // Update the last arbitration data that we have seen so far
2376         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2377                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2378                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2379                         // Is larger
2380                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2381                 }
2382         } else {
2383                 // Never seen any data from this arbitrator so record the first one
2384                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2385         }
2386
2387         // Set dead a transaction if we can
2388         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2389         if (transactionToSetDead != NULL) {
2390                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2391         }
2392
2393         // Update the last transaction sequence number that the arbitrator
2394         // arbitrated on
2395         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2396         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2397                 // Is a valid one
2398                 if (entry->getTransactionSequenceNumber() != -1) {
2399                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2400                 }
2401         }
2402 }
2403
2404 /**
2405  * Set dead the transaction part if that transaction is dead and keep
2406  * track of all new parts
2407  */
2408 void Table::processEntry(TransactionPart *entry) {
2409         // Check if we have already seen this transaction and set it dead OR
2410         // if it is not alive
2411         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2412         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2413                 // This transaction is dead, it was already committed or aborted
2414                 entry->setDead();
2415                 return;
2416         }
2417
2418         // This part is still alive
2419         Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2420
2421         if (transactionPart == NULL) {
2422                 // Dont have a table for this machine Id yet so make one
2423                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2424                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2425         }
2426
2427         // Update the part and set dead ones we have already seen (got a
2428         // rescued version)
2429         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2430         if (previouslySeenPart != NULL) {
2431                 previouslySeenPart->setDead();
2432         }
2433 }
2434
2435 /**
2436  * Process new commit entries and save them for future use->  Delete duplicates
2437  */
2438 void Table::processEntry(CommitPart *entry) {
2439         // Update the last transaction that was updated if we can
2440         if (entry->getTransactionSequenceNumber() != -1) {
2441                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2442                 // Update the last transaction sequence number that the arbitrator
2443                 // arbitrated on
2444                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2445                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2446                 }
2447         }
2448
2449         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2450         if (commitPart == NULL) {
2451                 // Don't have a table for this machine Id yet so make one
2452                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2453                 newCommitParts->put(entry->getMachineId(), commitPart);
2454         }
2455         // Update the part and set dead ones we have already seen (got a
2456         // rescued version)
2457         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2458         if (previouslySeenPart != NULL) {
2459                 previouslySeenPart->setDead();
2460         }
2461 }
2462
2463 /**
2464  * Update the last message seen table-> Update and set dead the
2465  * appropriate RejectedMessages as clients see them-> Updates the live
2466  * aborts, removes those that are dead and sets them dead-> Check that
2467  * the last message seen is correct and that there is no mismatch of
2468  * our own last message or that other clients have not had a rollback
2469  * on the last message->
2470  */
2471 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2472         // We have seen this machine ID
2473         machineSet->remove(machineId);
2474
2475         // Get the set of rejected messages that this machine Id is has not seen yet
2476         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2477         // If there is a rejected message that this machine Id has not seen yet
2478         if (watchset != NULL) {
2479                 // Go through each rejected message that this machine Id has not
2480                 // seen yet
2481                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2482                         RejectedMessage *rm = rmit->next();
2483                         // If this machine Id has seen this rejected message->->->
2484                         if (rm->getSequenceNumber() <= seqNum) {
2485                                 // Remove it from our watchlist
2486                                 rmit->remove();
2487                                 // Decrement machines that need to see this notification
2488                                 rm->removeWatcher(machineId);
2489                         }
2490                 }
2491         }
2492
2493         // Set dead the abort
2494         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2495                 Abort *abort = i->next()->getValue();
2496                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2497                         abort->setDead();
2498                         i->remove();
2499                         if (abort->getTransactionArbitrator() == localMachineId) {
2500                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2501                         }
2502                 }
2503         }
2504         if (machineId == localMachineId) {
2505                 // Our own messages are immediately dead->
2506                 if (liveness instanceof LastMessage) {
2507                         ((LastMessage *)liveness)->setDead();
2508                 } else if (liveness instanceof Slot) {
2509                         ((Slot *)liveness)->setDead();
2510                 } else {
2511                         throw new Error("Unrecognized type");
2512                 }
2513         }
2514         // Get the old last message for this device
2515         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2516         if (lastMessageEntry == NULL) {
2517                 // If no last message then there is nothing else to process
2518                 return;
2519         }
2520
2521         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2522         Liveness *lastEntry = lastMessageEntry->getSecond();
2523
2524         // If it is not our machine Id since we already set ours to dead
2525         if (machineId != localMachineId) {
2526                 if (lastEntry instanceof LastMessage) {
2527                         ((LastMessage *)lastEntry)->setDead();
2528                 } else if (lastEntry instanceof Slot) {
2529                         ((Slot *)lastEntry)->setDead();
2530                 } else {
2531                         throw new Error("Unrecognized type");
2532                 }
2533         }
2534         // Make sure the server is not playing any games
2535         if (machineId == localMachineId) {
2536                 if (hadPartialSendToServer) {
2537                         // We were not making any updates and we had a machine mismatch
2538                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2539                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2540                         }
2541                 } else {
2542                         // We were not making any updates and we had a machine mismatch
2543                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2544                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2545                         }
2546                 }
2547         } else {
2548                 if (lastMessageSeqNum > seqNum) {
2549                         throw new Error("Server Error: Rollback on remote machine sequence number");
2550                 }
2551         }
2552 }
2553
2554 /**
2555  * Add a rejected message entry to the watch set to keep track of
2556  * which clients have seen that rejected message entry and which have
2557  * not.
2558  */
2559 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2560         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2561         if (entries == NULL) {
2562                 // There is no set for this machine ID yet so create one
2563                 entries = new Hashset<RejectedMessage *>();
2564                 rejectedMessageWatchVectorTable->put(machineId, entries);
2565         }
2566         entries->add(entry);
2567 }
2568
2569 /**
2570  * Check if the HMAC chain is not violated
2571  */
2572 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2573         for (int i = 0; i < newSlots->length(); i++) {
2574                 Slot *currSlot = newSlots->get(i);
2575                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2576                 if (prevSlot != NULL &&
2577                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2578                         throw new Error("Server Error: Invalid HMAC Chain");
2579         }
2580 }