d598f15cfda3939f9d65c9f6fbf6876530c18622
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14
15
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
17         buffer(NULL),
18         cloud(new CloudComm(this, baseurl, password, listeningPort)),
19         random(NULL),
20         liveTableStatus(NULL),
21         pendingTransactionBuilder(NULL),
22         lastPendingTransactionSpeculatedOn(NULL),
23         firstPendingTransaction(NULL),
24         numberOfSlots(0),
25         bufferResizeThreshold(0),
26         liveSlotCount(0),
27         oldestLiveSlotSequenceNumver(1),
28         localMachineId(_localMachineId),
29         sequenceNumber(0),
30         localTransactionSequenceNumber(0),
31         lastTransactionSequenceNumberSpeculatedOn(0),
32         oldestTransactionSequenceNumberSpeculatedOn(0),
33         localArbitrationSequenceNumber(0),
34         hadPartialSendToServer(false),
35         attemptedToSendToServer(false),
36         expectedsize(0),
37         didFindTableStatus(false),
38         currMaxSize(0),
39         lastSlotAttemptedToSend(NULL),
40         lastIsNewKey(false),
41         lastNewSize(0),
42         lastTransactionPartsSent(NULL),
43         lastPendingSendArbitrationEntriesToDelete(NULL),
44         lastNewKey(NULL),
45         committedKeyValueTable(NULL),
46         speculatedKeyValueTable(NULL),
47         pendingTransactionSpeculatedKeyValueTable(NULL),
48         liveNewKeyTable(NULL),
49         lastMessageTable(NULL),
50         rejectedMessageWatchVectorTable(NULL),
51         arbitratorTable(NULL),
52         liveAbortTable(NULL),
53         newTransactionParts(NULL),
54         newCommitParts(NULL),
55         lastArbitratedTransactionNumberByArbitratorTable(NULL),
56         liveTransactionBySequenceNumberTable(NULL),
57         liveTransactionByTransactionIdTable(NULL),
58         liveCommitsTable(NULL),
59         liveCommitsByKeyTable(NULL),
60         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61         rejectedSlotVector(NULL),
62         pendingTransactionQueue(NULL),
63         pendingSendArbitrationRounds(NULL),
64         pendingSendArbitrationEntriesToDelete(NULL),
65         transactionPartsSent(NULL),
66         outstandingTransactionStatus(NULL),
67         liveAbortsGeneratedByLocal(NULL),
68         offlineTransactionsCommittedAndAtServer(NULL),
69         localCommunicationTable(NULL),
70         lastTransactionSeenFromMachineFromServer(NULL),
71         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72         lastInsertedNewKey(false),
73         lastSeqNumArbOn(0)
74 {
75         init();
76 }
77
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
79         buffer(NULL),
80         cloud(_cloud),
81         random(NULL),
82         liveTableStatus(NULL),
83         pendingTransactionBuilder(NULL),
84         lastPendingTransactionSpeculatedOn(NULL),
85         firstPendingTransaction(NULL),
86         numberOfSlots(0),
87         bufferResizeThreshold(0),
88         liveSlotCount(0),
89         oldestLiveSlotSequenceNumver(1),
90         localMachineId(_localMachineId),
91         sequenceNumber(0),
92         localTransactionSequenceNumber(0),
93         lastTransactionSequenceNumberSpeculatedOn(0),
94         oldestTransactionSequenceNumberSpeculatedOn(0),
95         localArbitrationSequenceNumber(0),
96         hadPartialSendToServer(false),
97         attemptedToSendToServer(false),
98         expectedsize(0),
99         didFindTableStatus(false),
100         currMaxSize(0),
101         lastSlotAttemptedToSend(NULL),
102         lastIsNewKey(false),
103         lastNewSize(0),
104         lastTransactionPartsSent(NULL),
105         lastPendingSendArbitrationEntriesToDelete(NULL),
106         lastNewKey(NULL),
107         committedKeyValueTable(NULL),
108         speculatedKeyValueTable(NULL),
109         pendingTransactionSpeculatedKeyValueTable(NULL),
110         liveNewKeyTable(NULL),
111         lastMessageTable(NULL),
112         rejectedMessageWatchVectorTable(NULL),
113         arbitratorTable(NULL),
114         liveAbortTable(NULL),
115         newTransactionParts(NULL),
116         newCommitParts(NULL),
117         lastArbitratedTransactionNumberByArbitratorTable(NULL),
118         liveTransactionBySequenceNumberTable(NULL),
119         liveTransactionByTransactionIdTable(NULL),
120         liveCommitsTable(NULL),
121         liveCommitsByKeyTable(NULL),
122         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123         rejectedSlotVector(NULL),
124         pendingTransactionQueue(NULL),
125         pendingSendArbitrationRounds(NULL),
126         pendingSendArbitrationEntriesToDelete(NULL),
127         transactionPartsSent(NULL),
128         outstandingTransactionStatus(NULL),
129         liveAbortsGeneratedByLocal(NULL),
130         offlineTransactionsCommittedAndAtServer(NULL),
131         localCommunicationTable(NULL),
132         lastTransactionSeenFromMachineFromServer(NULL),
133         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134         lastInsertedNewKey(false),
135         lastSeqNumArbOn(0)
136 {
137         init();
138 }
139
140 /**
141  * Init all the stuff needed for for table usage
142  */
143 void Table::init() {
144         // Init helper objects
145         random = new Random();
146         buffer = new SlotBuffer();
147
148         // init data structs
149         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155         arbitratorTable = new Hashtable<IoTString *, int64_t>();
156         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165         rejectedSlotVector = new Vector<int64_t>();
166         pendingTransactionQueue = new Vector<Transaction *>();
167         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
176
177
178         // Other init stuff
179         numberOfSlots = buffer->capacity();
180         setResizeThreshold();
181 }
182
183 /**
184  * Initialize the table by inserting a table status as the first entry
185  * into the table status also initialize the crypto stuff.
186  */
187 void Table::initTable() {
188         cloud->initSecurity();
189
190         // Create the first insertion into the block chain which is the table status
191         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192         localSequenceNumber++;
193         TableStatus *status = new TableStatus(s, numberOfSlots);
194         s->addEntry(status);
195         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
196
197         if (array == NULL) {
198                 array = new Array<Slot *>(1);
199                 array->set(0, s);
200                 // update local block chain
201                 validateAndUpdate(array, true);
202         } else if (array->length() == 1) {
203                 // in case we did push the slot BUT we failed to init it
204                 validateAndUpdate(array, true);
205         } else {
206                 throw new Error("Error on initialization");
207         }
208 }
209
210 /**
211  * Rebuild the table from scratch by pulling the latest block chain
212  * from the server.
213  */
214 void Table::rebuild() {
215         // Just pull the latest slots from the server
216         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217         validateAndUpdate(newslots, true);
218         sendToServer(NULL);
219         updateLiveTransactionsAndStatus();
220 }
221
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
224 }
225
226 int64_t Table::getArbitrator(IoTString *key) {
227         return arbitratorTable->get(key);
228 }
229
230 void Table::close() {
231         cloud->close();
232 }
233
234 IoTString *Table::getCommitted(IoTString *key)  {
235         KeyValue *kv = committedKeyValueTable->get(key);
236
237         if (kv != NULL) {
238                 return kv->getValue();
239         } else {
240                 return NULL;
241         }
242 }
243
244 IoTString *Table::getSpeculative(IoTString *key) {
245         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
246
247         if (kv == NULL) {
248                 kv = speculatedKeyValueTable->get(key);
249         }
250
251         if (kv == NULL) {
252                 kv = committedKeyValueTable->get(key);
253         }
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263         KeyValue *kv = committedKeyValueTable->get(key);
264
265         if (!arbitratorTable->contains(key)) {
266                 throw new Error("Key not Found.");
267         }
268
269         // Make sure new key value pair matches the current arbitrator
270         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271                 // TODO: Maybe not throw en error
272                 throw new Error("Not all Key Values Match Arbitrator.");
273         }
274
275         if (kv != NULL) {
276                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277                 return kv->getValue();
278         } else {
279                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
280                 return NULL;
281         }
282 }
283
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285         if (!arbitratorTable->contains(key)) {
286                 throw new Error("Key not Found.");
287         }
288
289         // Make sure new key value pair matches the current arbitrator
290         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291                 // TODO: Maybe not throw en error
292                 throw new Error("Not all Key Values Match Arbitrator.");
293         }
294
295         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
296
297         if (kv == NULL) {
298                 kv = speculatedKeyValueTable->get(key);
299         }
300
301         if (kv == NULL) {
302                 kv = committedKeyValueTable->get(key);
303         }
304
305         if (kv != NULL) {
306                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307                 return kv->getValue();
308         } else {
309                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
310                 return NULL;
311         }
312 }
313
314 bool Table::update()  {
315         try {
316                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317                 validateAndUpdate(newSlots, false);
318                 sendToServer(NULL);
319
320
321                 updateLiveTransactionsAndStatus();
322
323                 return true;
324         } catch (Exception *e) {
325                 for (int64_t m : localCommunicationTable->keySet()) {
326                         updateFromLocal(m);
327                 }
328         }
329
330         return false;
331 }
332
333 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
334         while (true) {
335                 if (!arbitratorTable->contains(keyName)) {
336                         // There is already an arbitrator
337                         return false;
338                 }
339
340                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
341
342                 if (sendToServer(newKey)) {
343                         // If successfully inserted
344                         return true;
345                 }
346         }
347 }
348
349 void Table::startTransaction() {
350         // Create a new transaction, invalidates any old pending transactions.
351         pendingTransactionBuilder = new PendingTransaction(localMachineId);
352 }
353
354 void Table::addKV(IoTString *key, IoTString *value) {
355
356         // Make sure it is a valid key
357         if (!arbitratorTable->contains(key)) {
358                 throw new Error("Key not Found.");
359         }
360
361         // Make sure new key value pair matches the current arbitrator
362         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
363                 // TODO: Maybe not throw en error
364                 throw new Error("Not all Key Values Match Arbitrator.");
365         }
366
367         // Add the key value to this transaction
368         KeyValue *kv = new KeyValue(key, value);
369         pendingTransactionBuilder->addKV(kv);
370 }
371
372 TransactionStatus *Table::commitTransaction() {
373
374         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375                 // transaction with no updates will have no effect on the system
376                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
377         }
378
379         // Set the local transaction sequence number and increment
380         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381         localTransactionSequenceNumber++;
382
383         // Create the transaction status
384         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
385
386         // Create the new transaction
387         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388         newTransaction->setTransactionStatus(transactionStatus);
389
390         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391                 // Add it to the queue and invalidate the builder for safety
392                 pendingTransactionQueue->add(newTransaction);
393         } else {
394                 arbitrateOnLocalTransaction(newTransaction);
395                 updateLiveStateFromLocal();
396         }
397
398         pendingTransactionBuilder = new PendingTransaction(localMachineId);
399
400         try {
401                 sendToServer(NULL);
402         } catch (ServerException *e) {
403
404                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405                 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
406                         Transaction *transaction = iter->next();
407
408                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409                                 // Already contacted this client so ignore all attempts to contact this client
410                                 // to preserve ordering for arbitrator
411                                 continue;
412                         }
413
414                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
415
416                         if (sendReturn->getFirst()) {
417                                 // Failed to contact over local
418                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
419                         } else {
420                                 // Successful contact or should not contact
421
422                                 if (sendReturn->getSecond()) {
423                                         // did arbitrate
424                                         iter->remove();
425                                 }
426                         }
427                 }
428         }
429
430         updateLiveStateFromLocal();
431
432         return transactionStatus;
433 }
434
435 /**
436  * Recalculate the new resize threshold
437  */
438 void Table::setResizeThreshold() {
439         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
440         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
441 }
442
443 int64_t Table::getLocalSequenceNumber() {
444         return localSequenceNumber;
445 }
446
447
448 bool lastInsertedNewKey = false;
449
450 bool Table::sendToServer(NewKey *newKey) {
451
452         bool fromRetry = false;
453
454         try {
455                 if (hadPartialSendToServer) {
456                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
457                         if (newSlots->length() == 0) {
458                                 fromRetry = true;
459                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
460
461                                 if (sendSlotsReturn->getFirst()) {
462                                         if (newKey != NULL) {
463                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
464                                                         newKey = NULL;
465                                                 }
466                                         }
467
468                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
469                                                 transaction->resetServerFailure();
470
471                                                 // Update which transactions parts still need to be sent
472                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
473
474                                                 // Add the transaction status to the outstanding list
475                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
476
477                                                 // Update the transaction status
478                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
479
480                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
481                                                 if (transaction->didSendAllParts()) {
482                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483                                                         pendingTransactionQueue->remove(transaction);
484                                                 }
485                                         }
486                                 } else {
487
488                                         newSlots = sendSlotsReturn->getThird();
489
490                                         bool isInserted = false;
491                                         for (uint si = 0; si < newSlots->length(); si++) {
492                                                 Slot *s = newSlots->get(si);
493                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
494                                                         isInserted = true;
495                                                         break;
496                                                 }
497                                         }
498
499                                         for (uint si = 0; si < newSlots->length(); si++) {
500                                                 Slot *s = newSlots->get(si);
501                                                 if (isInserted) {
502                                                         break;
503                                                 }
504
505                                                 // Process each entry in the slot
506                                                 for (Entry *entry : s->getEntries()) {
507                                                         if (entry->getType() == TypeLastMessage) {
508                                                                 LastMessage *lastMessage = (LastMessage *)entry;
509                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
510                                                                         isInserted = true;
511                                                                         break;
512                                                                 }
513                                                         }
514                                                 }
515                                         }
516
517                                         if (isInserted) {
518                                                 if (newKey != NULL) {
519                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
520                                                                 newKey = NULL;
521                                                         }
522                                                 }
523
524                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
525                                                         transaction->resetServerFailure();
526
527                                                         // Update which transactions parts still need to be sent
528                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
529
530                                                         // Add the transaction status to the outstanding list
531                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
532
533                                                         // Update the transaction status
534                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
535
536                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
537                                                         if (transaction->didSendAllParts()) {
538                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
539                                                                 pendingTransactionQueue->remove(transaction);
540                                                         } else {
541                                                                 transaction->resetServerFailure();
542                                                                 // Set the transaction sequence number back to nothing
543                                                                 if (!transaction->didSendAPartToServer()) {
544                                                                         transaction->setSequenceNumber(-1);
545                                                                 }
546                                                         }
547                                                 }
548                                         }
549                                 }
550
551                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
552                                         transaction->resetServerFailure();
553                                         // Set the transaction sequence number back to nothing
554                                         if (!transaction->didSendAPartToServer()) {
555                                                 transaction->setSequenceNumber(-1);
556                                         }
557                                 }
558
559                                 if (sendSlotsReturn->getThird()->length() != 0) {
560                                         // insert into the local block chain
561                                         validateAndUpdate(sendSlotsReturn->getThird(), true);
562                                 }
563                                 // continue;
564                         } else {
565                                 bool isInserted = false;
566                                 for (uint si = 0; si < newSlots->length(); si++) {
567                                         Slot *s = newSlots->get(si);
568                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
569                                                 isInserted = true;
570                                                 break;
571                                         }
572                                 }
573
574                                 for (uint si = 0; si < newSlots->length(); si++) {
575                                         Slot *s = newSlots->get(si);
576                                         if (isInserted) {
577                                                 break;
578                                         }
579
580                                         // Process each entry in the slot
581                                         for (Entry *entry : s->getEntries()) {
582
583                                                 if (entry->getType() == TypeLastMessage) {
584                                                         LastMessage *lastMessage = (LastMessage *)entry;
585                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
586                                                                 isInserted = true;
587                                                                 break;
588                                                         }
589                                                 }
590                                         }
591                                 }
592
593                                 if (isInserted) {
594                                         if (newKey != NULL) {
595                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
596                                                         newKey = NULL;
597                                                 }
598                                         }
599
600                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
601                                                 transaction->resetServerFailure();
602
603                                                 // Update which transactions parts still need to be sent
604                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
605
606                                                 // Add the transaction status to the outstanding list
607                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
608
609                                                 // Update the transaction status
610                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
611
612                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
613                                                 if (transaction->didSendAllParts()) {
614                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
615                                                         pendingTransactionQueue->remove(transaction);
616                                                 } else {
617                                                         transaction->resetServerFailure();
618                                                         // Set the transaction sequence number back to nothing
619                                                         if (!transaction->didSendAPartToServer()) {
620                                                                 transaction->setSequenceNumber(-1);
621                                                         }
622                                                 }
623                                         }
624                                 } else {
625                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
626                                                 transaction->resetServerFailure();
627                                                 // Set the transaction sequence number back to nothing
628                                                 if (!transaction->didSendAPartToServer()) {
629                                                         transaction->setSequenceNumber(-1);
630                                                 }
631                                         }
632                                 }
633
634                                 // insert into the local block chain
635                                 validateAndUpdate(newSlots, true);
636                         }
637                 }
638         } catch (ServerException *e) {
639                 throw e;
640         }
641
642
643
644         try {
645                 // While we have stuff that needs inserting into the block chain
646                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
647
648                         fromRetry = false;
649
650                         if (hadPartialSendToServer) {
651                                 throw new Error("Should Be error free");
652                         }
653
654
655
656                         // If there is a new key with same name then end
657                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
658                                 return false;
659                         }
660
661                         // Create the slot
662                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
663                         localSequenceNumber++;
664
665                         // Try to fill the slot with data
666                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
667                         bool needsResize = fillSlotsReturn->getFirst();
668                         int newSize = fillSlotsReturn->getSecond();
669                         bool insertedNewKey = fillSlotsReturn->getThird();
670
671                         if (needsResize) {
672                                 // Reset which transaction to send
673                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
674                                         transaction->resetNextPartToSend();
675
676                                         // Set the transaction sequence number back to nothing
677                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
678                                                 transaction->setSequenceNumber(-1);
679                                         }
680                                 }
681
682                                 // Clear the sent data since we are trying again
683                                 pendingSendArbitrationEntriesToDelete->clear();
684                                 transactionPartsSent->clear();
685
686                                 // We needed a resize so try again
687                                 fillSlot(slot, true, newKey);
688                         }
689
690                         lastSlotAttemptedToSend = slot;
691                         lastIsNewKey = (newKey != NULL);
692                         lastInsertedNewKey = insertedNewKey;
693                         lastNewSize = newSize;
694                         lastNewKey = newKey;
695                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
696                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
697
698
699                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
700
701                         if (sendSlotsReturn->getFirst()) {
702
703                                 // Did insert into the block chain
704
705                                 if (insertedNewKey) {
706                                         // This slot was what was inserted not a previous slot
707
708                                         // New Key was successfully inserted into the block chain so dont want to insert it again
709                                         newKey = NULL;
710                                 }
711
712                                 // Remove the aborts and commit parts that were sent from the pending to send queue
713                                 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
714                                         ArbitrationRound *round = iter->next();
715                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
716
717                                         if (round->isDoneSending()) {
718                                                 // Sent all the parts
719                                                 iter->remove();
720                                         }
721                                 }
722
723                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
724                                         transaction->resetServerFailure();
725
726                                         // Update which transactions parts still need to be sent
727                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
728
729                                         // Add the transaction status to the outstanding list
730                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
731
732                                         // Update the transaction status
733                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
734
735                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
736                                         if (transaction->didSendAllParts()) {
737                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
738                                                 pendingTransactionQueue->remove(transaction);
739                                         }
740                                 }
741                         } else {
742                                 // Reset which transaction to send
743                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
744                                         transaction->resetNextPartToSend();
745
746                                         // Set the transaction sequence number back to nothing
747                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
748                                                 transaction->setSequenceNumber(-1);
749                                         }
750                                 }
751                         }
752
753                         // Clear the sent data in preparation for next send
754                         pendingSendArbitrationEntriesToDelete->clear();
755                         transactionPartsSent->clear();
756
757                         if (sendSlotsReturn->getThird()->length() != 0) {
758                                 // insert into the local block chain
759                                 validateAndUpdate(sendSlotsReturn->getThird(), true);
760                         }
761                 }
762
763         } catch (ServerException *e) {
764
765                 if (e->getType() != ServerException->TypeInputTimeout) {
766                         // Nothing was able to be sent to the server so just clear these data structures
767                         for (Transaction *transaction : transactionPartsSent->keySet()) {
768                                 transaction->resetNextPartToSend();
769
770                                 // Set the transaction sequence number back to nothing
771                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772                                         transaction->setSequenceNumber(-1);
773                                 }
774                         }
775                 } else {
776                         // There was a partial send to the server
777                         hadPartialSendToServer = true;
778
779                         // Nothing was able to be sent to the server so just clear these data structures
780                         for (Transaction *transaction : transactionPartsSent->keySet()) {
781                                 transaction->resetNextPartToSend();
782                                 transaction->setServerFailure();
783                         }
784                 }
785
786                 pendingSendArbitrationEntriesToDelete->clear();
787                 transactionPartsSent->clear();
788
789                 throw e;
790         }
791
792         return newKey == NULL;
793 }
794
795 bool Table::updateFromLocal(int64_t machineId) {
796         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
797         if (localCommunicationInformation == NULL) {
798                 // Cant talk to that device locally so do nothing
799                 return false;
800         }
801
802         // Get the size of the send data
803         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
804
805         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
806         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
807                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
808         }
809
810         Array<char> *sendData = new Array<char>(sendDataSize);
811         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
812
813         // Encode the data
814         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
815         bbEncode->putInt(0);
816
817         // Send by local
818         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
819         localSequenceNumber++;
820
821         if (returnData == NULL) {
822                 // Could not contact server
823                 return false;
824         }
825
826         // Decode the data
827         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
828         int numberOfEntries = bbDecode->getInt();
829
830         for (int i = 0; i < numberOfEntries; i++) {
831                 char type = bbDecode->get();
832                 if (type == TypeAbort) {
833                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
834                         processEntry(abort);
835                 } else if (type == TypeCommitPart) {
836                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
837                         processEntry(commitPart);
838                 }
839         }
840
841         updateLiveStateFromLocal();
842
843         return true;
844 }
845
846 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
847
848         // Get the devices local communications
849         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
850
851         if (localCommunicationInformation == NULL) {
852                 // Cant talk to that device locally so do nothing
853                 return Pair<bool, bool>(true, false);
854         }
855
856         // Get the size of the send data
857         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
858         for (TransactionPart *part : transaction->getParts()->values()) {
859                 sendDataSize += part->getSize();
860         }
861
862         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
863         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
864                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
865         }
866
867         // Make the send data size
868         Array<char> *sendData = new Array<char>(sendDataSize);
869         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
870
871         // Encode the data
872         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
873         bbEncode->putInt(transaction->getParts()->size());
874         for (TransactionPart *part : transaction->getParts()->values()) {
875                 part->encode(bbEncode);
876         }
877
878
879         // Send by local
880         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
881         localSequenceNumber++;
882
883         if (returnData == NULL) {
884                 // Could not contact server
885                 return Pair<bool, bool>(true, false);
886         }
887
888         // Decode the data
889         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
890         bool didCommit = bbDecode->get() == 1;
891         bool couldArbitrate = bbDecode->get() == 1;
892         int numberOfEntries = bbDecode->getInt();
893         bool foundAbort = false;
894
895         for (int i = 0; i < numberOfEntries; i++) {
896                 char type = bbDecode->get();
897                 if (type == TypeAbort) {
898                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
899
900                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
901                                 foundAbort = true;
902                         }
903
904                         processEntry(abort);
905                 } else if (type == TypeCommitPart) {
906                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
907                         processEntry(commitPart);
908                 }
909         }
910
911         updateLiveStateFromLocal();
912
913         if (couldArbitrate) {
914                 TransactionStatus status =  transaction->getTransactionStatus();
915                 if (didCommit) {
916                         status->setStatus(TransactionStatus_StatusCommitted);
917                 } else {
918                         status->setStatus(TransactionStatus_StatusAborted);
919                 }
920         } else {
921                 TransactionStatus status =  transaction->getTransactionStatus();
922                 if (foundAbort) {
923                         status->setStatus(TransactionStatus_StatusAborted);
924                 } else {
925                         status->setStatus(TransactionStatus_StatusCommitted);
926                 }
927         }
928
929         return Pair<bool, bool>(false, true);
930 }
931
932 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
933
934         // Decode the data
935         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
936         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
937         int numberOfParts = bbDecode->getInt();
938
939         // If we did commit a transaction or not
940         bool didCommit = false;
941         bool couldArbitrate = false;
942
943         if (numberOfParts != 0) {
944
945                 // decode the transaction
946                 Transaction *transaction = new Transaction();
947                 for (int i = 0; i < numberOfParts; i++) {
948                         bbDecode->get();
949                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
950                         transaction->addPartDecode(newPart);
951                 }
952
953                 // Arbitrate on transaction and pull relevant return data
954                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
955                 couldArbitrate = localArbitrateReturn->getFirst();
956                 didCommit = localArbitrateReturn->getSecond();
957
958                 updateLiveStateFromLocal();
959
960                 // Transaction was sent to the server so keep track of it to prevent double commit
961                 if (transaction->getSequenceNumber() != -1) {
962                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
963                 }
964         }
965
966         // The data to send back
967         int returnDataSize = 0;
968         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
969
970         // Get the aborts to send back
971         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
972         Collections->sort(abortLocalSequenceNumbers);
973         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
974                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
975                         continue;
976                 }
977
978                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
979                 unseenArbitrations->add(abort);
980                 returnDataSize += abort->getSize();
981         }
982
983         // Get the commits to send back
984         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
985         if (commitForClientTable != NULL) {
986                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
987                 Collections->sort(commitLocalSequenceNumbers);
988
989                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
990                         Commit *commit = commitForClientTable->get(localSequenceNumber);
991
992                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
993                                 continue;
994                         }
995
996                         unseenArbitrations->addAll(commit->getParts()->values());
997
998                         for (CommitPart commitPart : commit->getParts()->values()) {
999                                 returnDataSize += commitPart->getSize();
1000                         }
1001                 }
1002         }
1003
1004         // Number of arbitration entries to decode
1005         returnDataSize += 2 * sizeof(int32_t);
1006
1007         // bool of did commit or not
1008         if (numberOfParts != 0) {
1009                 returnDataSize += sizeof(char);
1010         }
1011
1012         // Data to send Back
1013         Array<char> *returnData = new Array<char>(returnDataSize);
1014         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1015
1016         if (numberOfParts != 0) {
1017                 if (didCommit) {
1018                         bbEncode->put((char)1);
1019                 } else {
1020                         bbEncode->put((char)0);
1021                 }
1022                 if (couldArbitrate) {
1023                         bbEncode->put((char)1);
1024                 } else {
1025                         bbEncode->put((char)0);
1026                 }
1027         }
1028
1029         bbEncode->putInt(unseenArbitrations->size());
1030         for (Entry *entry : unseenArbitrations) {
1031                 entry->encode(bbEncode);
1032         }
1033
1034
1035         localSequenceNumber++;
1036         return returnData;
1037 }
1038
1039 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1040         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1041         attemptedToSendToServer = true;
1042
1043         bool inserted = false;
1044         bool lastTryInserted = false;
1045
1046         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1047         if (array == NULL) {
1048                 array = new Array<Slot *>();
1049                 array->set(0, slot);
1050                 rejectedSlotVector->clear();
1051                 inserted = true;
1052         } else {
1053                 if (array->length() == 0) {
1054                         throw new Error("Server Error: Did not send any slots");
1055                 }
1056
1057                 // if (attemptedToSendToServerTmp) {
1058                 if (hadPartialSendToServer) {
1059
1060                         bool isInserted = false;
1061                         for (Slot *s : array) {
1062                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1063                                         isInserted = true;
1064                                         break;
1065                                 }
1066                         }
1067
1068                         for (Slot *s : array) {
1069                                 if (isInserted) {
1070                                         break;
1071                                 }
1072
1073                                 // Process each entry in the slot
1074                                 for (Entry *entry : s->getEntries()) {
1075
1076                                         if (entry->getType() == TypeLastMessage) {
1077                                                 LastMessage *lastMessage = (LastMessage *)entry;
1078
1079                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1080                                                         isInserted = true;
1081                                                         break;
1082                                                 }
1083                                         }
1084                                 }
1085                         }
1086
1087                         if (!isInserted) {
1088                                 rejectedSlotVector->add(slot->getSequenceNumber());
1089                                 lastTryInserted = false;
1090                         } else {
1091                                 lastTryInserted = true;
1092                         }
1093                 } else {
1094                         rejectedSlotVector->add(slot->getSequenceNumber());
1095                         lastTryInserted = false;
1096                 }
1097         }
1098
1099         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1100 }
1101
1102 /**
1103  * Returns false if a resize was needed
1104  */
1105 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1106
1107
1108         int newSize = 0;
1109         if (liveSlotCount > bufferResizeThreshold) {
1110                 resize = true;  //Resize is forced
1111
1112         }
1113
1114         if (resize) {
1115                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1116                 TableStatus *status = new TableStatus(slot, newSize);
1117                 slot->addEntry(status);
1118         }
1119
1120         // Fill with rejected slots first before doing anything else
1121         doRejectedMessages(slot);
1122
1123         // Do mandatory rescue of entries
1124         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1125
1126         // Extract working variables
1127         bool needsResize = mandatoryRescueReturn->getFirst();
1128         bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1129         int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1130
1131         if (needsResize && !resize) {
1132                 // We need to resize but we are not resizing so return false
1133                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1134         }
1135
1136         bool inserted = false;
1137         if (newKeyEntry != NULL) {
1138                 newKeyEntry->setSlot(slot);
1139                 if (slot->hasSpace(newKeyEntry)) {
1140
1141                         slot->addEntry(newKeyEntry);
1142                         inserted = true;
1143                 }
1144         }
1145
1146         // Clear the transactions, aborts and commits that were sent previously
1147         transactionPartsSent->clear();
1148         pendingSendArbitrationEntriesToDelete->clear();
1149
1150         for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1151                 bool isFull = false;
1152                 round->generateParts();
1153                 Vector<Entry *> *parts = round->getParts();
1154
1155                 // Insert pending arbitration data
1156                 for (Entry *arbitrationData : parts) {
1157
1158                         // If it is an abort then we need to set some information
1159                         if (arbitrationData instanceof Abort) {
1160                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1161                         }
1162
1163                         if (!slot->hasSpace(arbitrationData)) {
1164                                 // No space so cant do anything else with these data entries
1165                                 isFull = true;
1166                                 break;
1167                         }
1168
1169                         // Add to this current slot and add it to entries to delete
1170                         slot->addEntry(arbitrationData);
1171                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1172                 }
1173
1174                 if (isFull) {
1175                         break;
1176                 }
1177         }
1178
1179         if (pendingTransactionQueue->size() > 0) {
1180
1181                 Transaction *transaction = pendingTransactionQueue->get(0);
1182
1183                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1184                 // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) {
1185                 //  transaction->setSequenceNumber(slot->getSequenceNumber());
1186                 // }
1187
1188                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1189                         transaction->setSequenceNumber(slot->getSequenceNumber());
1190                 }
1191
1192
1193                 while (true) {
1194                         TransactionPart *part = transaction->getNextPartToSend();
1195
1196                         if (part == NULL) {
1197                                 // Ran out of parts to send for this transaction so move on
1198                                 break;
1199                         }
1200
1201                         if (slot->hasSpace(part)) {
1202                                 slot->addEntry(part);
1203                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1204                                 if (partsSent == NULL) {
1205                                         partsSent = new Vector<int32_t>();
1206                                         transactionPartsSent->put(transaction, partsSent);
1207                                 }
1208                                 partsSent->add(part->getPartNumber());
1209                                 transactionPartsSent->put(transaction, partsSent);
1210                         } else {
1211                                 break;
1212                         }
1213                 }
1214         }
1215
1216         // Fill the remainder of the slot with rescue data
1217         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1218
1219         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1220 }
1221
1222 void Table::doRejectedMessages(Slot *s) {
1223         if (!rejectedSlotVector->isEmpty()) {
1224                 /* TODO: We should avoid generating a rejected message entry if
1225                  * there is already a sufficient entry in the queue (e->g->,
1226                  * equalsto value of true and same sequence number)->  */
1227
1228                 int64_t old_seqn = rejectedSlotVector->firstElement();
1229                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1230                         int64_t new_seqn = rejectedSlotVector->lastElement();
1231                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1232                         s->addEntry(rm);
1233                 } else {
1234                         int64_t prev_seqn = -1;
1235                         int i = 0;
1236                         /* Go through list of missing messages */
1237                         for (; i < rejectedSlotVector->size(); i++) {
1238                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1239                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1240                                 if (s_msg != NULL)
1241                                         break;
1242                                 prev_seqn = curr_seqn;
1243                         }
1244                         /* Generate rejected message entry for missing messages */
1245                         if (prev_seqn != -1) {
1246                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1247                                 s->addEntry(rm);
1248                         }
1249                         /* Generate rejected message entries for present messages */
1250                         for (; i < rejectedSlotVector->size(); i++) {
1251                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1252                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1253                                 int64_t machineid = s_msg->getMachineID();
1254                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1255                                 s->addEntry(rm);
1256                         }
1257                 }
1258         }
1259 }
1260
1261 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1262         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1263         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1264         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1265                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1266         }
1267
1268         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1269         bool seenLiveSlot = false;
1270         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1271         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1272
1273
1274         // Mandatory Rescue
1275         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1276                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1277                 // Push slot number forward
1278                 if (!seenLiveSlot) {
1279                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1280                 }
1281
1282                 if (!previousSlot->isLive()) {
1283                         continue;
1284                 }
1285
1286                 // We have seen a live slot
1287                 seenLiveSlot = true;
1288
1289                 // Get all the live entries for a slot
1290                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1291
1292                 // Iterate over all the live entries and try to rescue them
1293                 for (Entry *liveEntry : liveEntries) {
1294                         if (slot->hasSpace(liveEntry)) {
1295
1296                                 // Enough space to rescue the entry
1297                                 slot->addEntry(liveEntry);
1298                         } else if (currentSequenceNumber == firstIfFull) {
1299                                 //if there's no space but the entry is about to fall off the queue
1300                                 System->out->println("B");      //?
1301                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1302
1303                         }
1304                 }
1305         }
1306
1307         // Did not resize
1308         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1309 }
1310
1311 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1312         /* now go through live entries from least to greatest sequence number until
1313          * either all live slots added, or the slot doesn't have enough room
1314          * for SKIP_THRESHOLD consecutive entries*/
1315         int skipcount = 0;
1316         int64_t newestseqnum = buffer->getNewestSeqNum();
1317 search:
1318         for (; seqn <= newestseqnum; seqn++) {
1319                 Slot *prevslot = buffer->getSlot(seqn);
1320                 //Push slot number forward
1321                 if (!seenliveslot)
1322                         oldestLiveSlotSequenceNumver = seqn;
1323
1324                 if (!prevslot->isLive())
1325                         continue;
1326                 seenliveslot = true;
1327                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1328                 for (Entry *liveentry : liveentries) {
1329                         if (s->hasSpace(liveentry))
1330                                 s->addEntry(liveentry);
1331                         else {
1332                                 skipcount++;
1333                                 if (skipcount > Table_SKIP_THRESHOLD)
1334                                         break search;
1335                         }
1336                 }
1337         }
1338 }
1339
1340 /**
1341  * Checks for malicious activity and updates the local copy of the block chain->
1342  */
1343 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1344
1345         // The cloud communication layer has checked slot HMACs already before decoding
1346         if (newSlots->length() == 0) {
1347                 return;
1348         }
1349
1350         // Make sure all slots are newer than the last largest slot this client has seen
1351         int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
1352         if (firstSeqNum <= sequenceNumber) {
1353                 throw new Error("Server Error: Sent older slots!");
1354         }
1355
1356         // Create an object that can access both new slots and slots in our local chain
1357         // without committing slots to our local chain
1358         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1359
1360         // Check that the HMAC chain is not broken
1361         checkHMACChain(indexer, newSlots);
1362
1363         // Set to keep track of messages from clients
1364         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1365
1366         // Process each slots data
1367         for (Slot *slot : newSlots) {
1368                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1369
1370                 updateExpectedSize();
1371         }
1372
1373         // If there is a gap, check to see if the server sent us everything->
1374         if (firstSeqNum != (sequenceNumber + 1)) {
1375
1376                 // Check the size of the slots that were sent down by the server->
1377                 // Can only check the size if there was a gap
1378                 checkNumSlots(newSlots->length);
1379
1380                 // Since there was a gap every machine must have pushed a slot or must have
1381                 // a last message message->  If not then the server is hiding slots
1382                 if (!machineSet->isEmpty()) {
1383                         throw new Error("Missing record for machines: " + machineSet);
1384                 }
1385         }
1386
1387         // Update the size of our local block chain->
1388         commitNewMaxSize();
1389
1390         // Commit new to slots to the local block chain->
1391         for (Slot *slot : newSlots) {
1392
1393                 // Insert this slot into our local block chain copy->
1394                 buffer->putSlot(slot);
1395
1396                 // Keep track of how many slots are currently live (have live data in them)->
1397                 liveSlotCount++;
1398         }
1399
1400         // Get the sequence number of the latest slot in the system
1401         sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
1402
1403         updateLiveStateFromServer();
1404
1405         // No Need to remember after we pulled from the server
1406         offlineTransactionsCommittedAndAtServer->clear();
1407
1408         // This is invalidated now
1409         hadPartialSendToServer = false;
1410 }
1411
1412 void Table::updateLiveStateFromServer() {
1413         // Process the new transaction parts
1414         processNewTransactionParts();
1415
1416         // Do arbitration on new transactions that were received
1417         arbitrateFromServer();
1418
1419         // Update all the committed keys
1420         bool didCommitOrSpeculate = updateCommittedTable();
1421
1422         // Delete the transactions that are now dead
1423         updateLiveTransactionsAndStatus();
1424
1425         // Do speculations
1426         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1427         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1428 }
1429
1430 void Table::updateLiveStateFromLocal() {
1431         // Update all the committed keys
1432         bool didCommitOrSpeculate = updateCommittedTable();
1433
1434         // Delete the transactions that are now dead
1435         updateLiveTransactionsAndStatus();
1436
1437         // Do speculations
1438         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1439         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1440 }
1441
1442 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1443         // if (didFindTableStatus) {
1444         // return;
1445         // }
1446         int64_t prevslots = firstSequenceNumber;
1447
1448
1449         if (didFindTableStatus) {
1450                 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1451                 // System->out->println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1452
1453         } else {
1454                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1455                 // System->out->println("Here: " + expectedsize);
1456         }
1457
1458         // System->out->println(numberOfSlots);
1459
1460         didFindTableStatus = true;
1461         currMaxSize = numberOfSlots;
1462 }
1463
1464 void Table::updateExpectedSize() {
1465         expectedsize++;
1466
1467         if (expectedsize > currMaxSize) {
1468                 expectedsize = currMaxSize;
1469         }
1470 }
1471
1472
1473 /**
1474  * Check the size of the block chain to make sure there are enough slots sent back by the server->
1475  * This is only called when we have a gap between the slots that we have locally and the slots
1476  * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1477  * status message
1478  */
1479 void Table::checkNumSlots(int numberOfSlots) {
1480         if (numberOfSlots != expectedsize) {
1481                 throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
1482         }
1483 }
1484
1485 void Table::updateCurrMaxSize(int newmaxsize) {
1486         currMaxSize = newmaxsize;
1487 }
1488
1489
1490 /**
1491  * Update the size of of the local buffer if it is needed->
1492  */
1493 void Table::commitNewMaxSize() {
1494         didFindTableStatus = false;
1495
1496         // Resize the local slot buffer
1497         if (numberOfSlots != currMaxSize) {
1498                 buffer->resize((int32_t)currMaxSize);
1499         }
1500
1501         // Change the number of local slots to the new size
1502         numberOfSlots = (int32_t)currMaxSize;
1503
1504
1505         // Recalculate the resize threshold since the size of the local buffer has changed
1506         setResizeThreshold();
1507 }
1508
1509 /**
1510  * Process the new transaction parts from this latest round of slots received from the server
1511  */
1512 void Table::processNewTransactionParts() {
1513
1514         if (newTransactionParts->size() == 0) {
1515                 // Nothing new to process
1516                 return;
1517         }
1518
1519         // Iterate through all the machine Ids that we received new parts for
1520         for (int64_t machineId : newTransactionParts->keySet()) {
1521                 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1522
1523                 // Iterate through all the parts for that machine Id
1524                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1525                         TransactionPart *part = parts->get(partId);
1526
1527                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1528                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1529                                 // Set dead the transaction part
1530                                 part->setDead();
1531                                 continue;
1532                         }
1533
1534                         // Get the transaction object for that sequence number
1535                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1536
1537                         if (transaction == NULL) {
1538                                 // This is a new transaction that we dont have so make a new one
1539                                 transaction = new Transaction();
1540
1541                                 // Insert this new transaction into the live tables
1542                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1543                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1544                         }
1545
1546                         // Add that part to the transaction
1547                         transaction->addPartDecode(part);
1548                 }
1549         }
1550
1551         // Clear all the new transaction parts in preparation for the next time the server sends slots
1552         newTransactionParts->clear();
1553 }
1554
1555 void Table::arbitrateFromServer() {
1556
1557         if (liveTransactionBySequenceNumberTable->size() == 0) {
1558                 // Nothing to arbitrate on so move on
1559                 return;
1560         }
1561
1562         // Get the transaction sequence numbers and sort from oldest to newest
1563         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1564         Collections->sort(transactionSequenceNumbers);
1565
1566         // Collection of key value pairs that are
1567         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1568
1569         // The last transaction arbitrated on
1570         int64_t lastTransactionCommitted = -1;
1571         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1572
1573         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1574                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1575
1576
1577
1578                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1579                 if (transaction->getArbitrator() != localMachineId) {
1580                         continue;
1581                 }
1582
1583                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1584                         continue;
1585                 }
1586
1587                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1588                         // We have seen this already locally so dont commit again
1589                         continue;
1590                 }
1591
1592
1593                 if (!transaction->isComplete()) {
1594                         // Will arbitrate in incorrect order if we continue so just break
1595                         // Most likely this
1596                         break;
1597                 }
1598
1599
1600                 // update the largest transaction seen by arbitrator from server
1601                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1602                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1603                 } else {
1604                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1605                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1606                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1607                         }
1608                 }
1609
1610                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1611                         // Guard evaluated as true
1612
1613                         // Update the local changes so we can make the commit
1614                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1615                                 speculativeTableTmp->put(kv->getKey(), kv);
1616                         }
1617
1618                         // Update what the last transaction committed was for use in batch commit
1619                         lastTransactionCommitted = transactionSequenceNumber;
1620                 } else {
1621                         // Guard evaluated was false so create abort
1622
1623                         // Create the abort
1624                         Abort *newAbort = new Abort(NULL,
1625                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1626                                                                                                                                         transaction->getSequenceNumber(),
1627                                                                                                                                         transaction->getMachineId(),
1628                                                                                                                                         transaction->getArbitrator(),
1629                                                                                                                                         localArbitrationSequenceNumber);
1630                         localArbitrationSequenceNumber++;
1631
1632                         generatedAborts->add(newAbort);
1633
1634                         // Insert the abort so we can process
1635                         processEntry(newAbort);
1636                 }
1637
1638                 lastSeqNumArbOn = transactionSequenceNumber;
1639
1640                 // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
1641         }
1642
1643         Commit *newCommit = NULL;
1644
1645         // If there is something to commit
1646         if (speculativeTableTmp->size() != 0) {
1647
1648                 // Create the commit and increment the commit sequence number
1649                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1650                 localArbitrationSequenceNumber++;
1651
1652                 // Add all the new keys to the commit
1653                 for (KeyValue *kv : speculativeTableTmp->values()) {
1654                         newCommit->addKV(kv);
1655                 }
1656
1657                 // create the commit parts
1658                 newCommit->createCommitParts();
1659
1660                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1661
1662                 // Insert the commit so we can process it
1663                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1664                         processEntry(commitPart);
1665                 }
1666         }
1667
1668         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1669                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1670                 pendingSendArbitrationRounds->add(arbitrationRound);
1671
1672                 if (compactArbitrationData()) {
1673                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1674                         if (newArbitrationRound->getCommit() != NULL) {
1675                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1676                                         processEntry(commitPart);
1677                                 }
1678                         }
1679                 }
1680         }
1681 }
1682
1683 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1684
1685         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1686         if (transaction->getArbitrator() != localMachineId) {
1687                 return Pair<bool, bool>(false, false);
1688         }
1689
1690         if (!transaction->isComplete()) {
1691                 // Will arbitrate in incorrect order if we continue so just break
1692                 // Most likely this
1693                 return Pair<bool, bool>(false, false);
1694         }
1695
1696         if (transaction->getMachineId() != localMachineId) {
1697                 // dont do this check for local transactions
1698                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1699                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1700                                 // We've have already seen this from the server
1701                                 return Pair<bool, bool>(false, false);
1702                         }
1703                 }
1704         }
1705
1706         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1707                 // Guard evaluated as true
1708
1709                 // Create the commit and increment the commit sequence number
1710                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1711                 localArbitrationSequenceNumber++;
1712
1713                 // Update the local changes so we can make the commit
1714                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1715                         newCommit->addKV(kv);
1716                 }
1717
1718                 // create the commit parts
1719                 newCommit->createCommitParts();
1720
1721                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1722                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1723                 pendingSendArbitrationRounds->add(arbitrationRound);
1724
1725                 if (compactArbitrationData()) {
1726                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1727                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1728                                 processEntry(commitPart);
1729                         }
1730                 } else {
1731                         // Insert the commit so we can process it
1732                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1733                                 processEntry(commitPart);
1734                         }
1735                 }
1736
1737                 if (transaction->getMachineId() == localMachineId) {
1738                         TransactionStatus *status = transaction->getTransactionStatus();
1739                         if (status != NULL) {
1740                                 status->setStatus(TransactionStatus_StatusCommitted);
1741                         }
1742                 }
1743
1744                 updateLiveStateFromLocal();
1745                 return Pair<bool, bool>(true, true);
1746         } else {
1747
1748                 if (transaction->getMachineId() == localMachineId) {
1749                         // For locally created messages update the status
1750
1751                         // Guard evaluated was false so create abort
1752                         TransactionStatus status = transaction->getTransactionStatus();
1753                         if (status != NULL) {
1754                                 status->setStatus(TransactionStatus_StatusAborted);
1755                         }
1756                 } else {
1757                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1758
1759
1760                         // Create the abort
1761                         Abort *newAbort = new Abort(NULL,
1762                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1763                                                                                                                                         -1,
1764                                                                                                                                         transaction->getMachineId(),
1765                                                                                                                                         transaction->getArbitrator(),
1766                                                                                                                                         localArbitrationSequenceNumber);
1767                         localArbitrationSequenceNumber++;
1768
1769                         addAbortSet->add(newAbort);
1770
1771
1772                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1773                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1774                         pendingSendArbitrationRounds->add(arbitrationRound);
1775
1776                         if (compactArbitrationData()) {
1777                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1778                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1779                                         processEntry(commitPart);
1780                                 }
1781                         }
1782                 }
1783
1784                 updateLiveStateFromLocal();
1785                 return Pair<bool, bool>(true, false);
1786         }
1787 }
1788
1789 /**
1790  * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1791  */
1792 bool Table::compactArbitrationData() {
1793
1794         if (pendingSendArbitrationRounds->size() < 2) {
1795                 // Nothing to compact so do nothing
1796                 return false;
1797         }
1798
1799         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1800         if (lastRound->didSendPart()) {
1801                 return false;
1802         }
1803
1804         bool hadCommit = (lastRound->getCommit() == NULL);
1805         bool gotNewCommit = false;
1806
1807         int numberToDelete = 1;
1808         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1809                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1810
1811                 if (round->isFull() || round->didSendPart()) {
1812                         // Stop since there is a part that cannot be compacted and we need to compact in order
1813                         break;
1814                 }
1815
1816                 if (round->getCommit() == NULL) {
1817
1818                         // Try compacting aborts only
1819                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1820                         if (newSize > ArbitrationRound->MAX_PARTS) {
1821                                 // Cant compact since it would be too large
1822                                 break;
1823                         }
1824                         lastRound->addAborts(round->getAborts());
1825                 } else {
1826
1827                         // Create a new larger commit
1828                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1829                         localArbitrationSequenceNumber++;
1830
1831                         // Create the commit parts so that we can count them
1832                         newCommit->createCommitParts();
1833
1834                         // Calculate the new size of the parts
1835                         int newSize = newCommit->getNumberOfParts();
1836                         newSize += lastRound->getAbortsCount();
1837                         newSize += round->getAbortsCount();
1838
1839                         if (newSize > ArbitrationRound->MAX_PARTS) {
1840                                 // Cant compact since it would be too large
1841                                 break;
1842                         }
1843
1844                         // Set the new compacted part
1845                         lastRound->setCommit(newCommit);
1846                         lastRound->addAborts(round->getAborts());
1847                         gotNewCommit = true;
1848                 }
1849
1850                 numberToDelete++;
1851         }
1852
1853         if (numberToDelete != 1) {
1854                 // If there is a compaction
1855
1856                 // Delete the previous pieces that are now in the new compacted piece
1857                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1858                         pendingSendArbitrationRounds->clear();
1859                 } else {
1860                         for (int i = 0; i < numberToDelete; i++) {
1861                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1862                         }
1863                 }
1864
1865                 // Add the new compacted into the pending to send list
1866                 pendingSendArbitrationRounds->add(lastRound);
1867
1868                 // Should reinsert into the commit processor
1869                 if (hadCommit && gotNewCommit) {
1870                         return true;
1871                 }
1872         }
1873
1874         return false;
1875 }
1876 // bool compactArbitrationData() {
1877 //  return false;
1878 // }
1879
1880 /**
1881  * Update all the commits and the committed tables, sets dead the dead transactions
1882  */
1883 bool Table::updateCommittedTable() {
1884
1885         if (newCommitParts->size() == 0) {
1886                 // Nothing new to process
1887                 return false;
1888         }
1889
1890         // Iterate through all the machine Ids that we received new parts for
1891         for (int64_t machineId : newCommitParts->keySet()) {
1892                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1893
1894                 // Iterate through all the parts for that machine Id
1895                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1896                         CommitPart *part = parts->get(partId);
1897
1898                         // Get the transaction object for that sequence number
1899                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1900
1901                         if (commitForClientTable == NULL) {
1902                                 // This is the first commit from this device
1903                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1904                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1905                         }
1906
1907                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1908
1909                         if (commit == NULL) {
1910                                 // This is a new commit that we dont have so make a new one
1911                                 commit = new Commit();
1912
1913                                 // Insert this new commit into the live tables
1914                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1915                         }
1916
1917                         // Add that part to the commit
1918                         commit->addPartDecode(part);
1919                 }
1920         }
1921
1922         // Clear all the new commits parts in preparation for the next time the server sends slots
1923         newCommitParts->clear();
1924
1925         // If we process a new commit keep track of it for future use
1926         bool didProcessANewCommit = false;
1927
1928         // Process the commits one by one
1929         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1930
1931                 // Get all the commits for a specific arbitrator
1932                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1933
1934                 // Sort the commits in order
1935                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1936                 Collections->sort(commitSequenceNumbers);
1937
1938                 // Get the last commit seen from this arbitrator
1939                 int64_t lastCommitSeenSequenceNumber = -1;
1940                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1941                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1942                 }
1943
1944                 // Go through each new commit one by one
1945                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1946                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1947                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1948
1949                         // Special processing if a commit is not complete
1950                         if (!commit->isComplete()) {
1951                                 if (i == (commitSequenceNumbers->size() - 1)) {
1952                                         // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1953                                         break;
1954                                 } else {
1955                                         // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
1956                                         // Delete it and move on
1957                                         commit->setDead();
1958                                         commitForClientTable->remove(commit->getSequenceNumber());
1959                                         continue;
1960                                 }
1961                         }
1962
1963                         // Update the last transaction that was updated if we can
1964                         if (commit->getTransactionSequenceNumber() != -1) {
1965                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1966
1967                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1968                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1969                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1970                                 }
1971                         }
1972
1973                         // Update the last arbitration data that we have seen so far
1974                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1975
1976                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1977                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1978                                         // Is larger
1979                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1980                                 }
1981                         } else {
1982                                 // Never seen any data from this arbitrator so record the first one
1983                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1984                         }
1985
1986                         // We have already seen this commit before so need to do the full processing on this commit
1987                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1988
1989                                 // Update the last transaction that was updated if we can
1990                                 if (commit->getTransactionSequenceNumber() != -1) {
1991                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1992
1993                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1994                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1995                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1996                                         }
1997                                 }
1998
1999                                 continue;
2000                         }
2001
2002                         // If we got here then this is a brand new commit and needs full processing
2003
2004                         // Get what commits should be edited, these are the commits that have live values for their keys
2005                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2006                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2007                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2008                         }
2009                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2010
2011                         // Update each previous commit that needs to be updated
2012                         for (Commit *previousCommit : commitsToEdit) {
2013
2014                                 // Only bother with live commits (TODO: Maybe remove this check)
2015                                 if (previousCommit->isLive()) {
2016
2017                                         // Update which keys in the old commits are still live
2018                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2019                                                 previousCommit->invalidateKey(kv->getKey());
2020                                         }
2021
2022                                         // if the commit is now dead then remove it
2023                                         if (!previousCommit->isLive()) {
2024                                                 commitForClientTable->remove(previousCommit);
2025                                         }
2026                                 }
2027                         }
2028
2029                         // Update the last seen sequence number from this arbitrator
2030                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2031                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2032                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2033                                 }
2034                         } else {
2035                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2036                         }
2037
2038                         // We processed a new commit that we havent seen before
2039                         didProcessANewCommit = true;
2040
2041                         // Update the committed table of keys and which commit is using which key
2042                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2043                                 committedKeyValueTable->put(kv->getKey(), kv);
2044                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2045                         }
2046                 }
2047         }
2048
2049         return didProcessANewCommit;
2050 }
2051
2052 /**
2053  * Create the speculative table from transactions that are still live and have come from the cloud
2054  */
2055 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2056         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2057                 // There is nothing to speculate on
2058                 return false;
2059         }
2060
2061         // Create a list of the transaction sequence numbers and sort them from oldest to newest
2062         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2063         Collections->sort(transactionSequenceNumbersSorted);
2064
2065         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2066
2067
2068         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2069                 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2070                 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2071
2072                 // Start from scratch
2073                 speculatedKeyValueTable->clear();
2074                 lastTransactionSequenceNumberSpeculatedOn = -1;
2075                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2076
2077         }
2078
2079         // Remember the front of the transaction list
2080         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2081
2082         // Find where to start arbitration from
2083         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2084
2085         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2086                 // Make sure we are not out of bounds
2087                 return false;           // did not speculate
2088         }
2089
2090         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2091         bool didSkip = true;
2092
2093         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2094                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2095                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2096
2097                 if (!transaction->isComplete()) {
2098                         // If there is an incomplete transaction then there is nothing we can do
2099                         // add this transactions arbitrator to the list of arbitrators we should ignore
2100                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2101                         didSkip = true;
2102                         continue;
2103                 }
2104
2105                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2106                         continue;
2107                 }
2108
2109                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2110
2111                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2112                         // Guard evaluated to true so update the speculative table
2113                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2114                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2115                         }
2116                 }
2117         }
2118
2119         if (didSkip) {
2120                 // Since there was a skip we need to redo the speculation next time around
2121                 lastTransactionSequenceNumberSpeculatedOn = -1;
2122                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2123         }
2124
2125         // We did some speculation
2126         return true;
2127 }
2128
2129 /**
2130  * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2131  */
2132 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2133         if (pendingTransactionQueue->size() == 0) {
2134                 // There is nothing to speculate on
2135                 return;
2136         }
2137
2138         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2139                 // need to reset on the pending speculation
2140                 lastPendingTransactionSpeculatedOn = NULL;
2141                 firstPendingTransaction = pendingTransactionQueue->get(0);
2142                 pendingTransactionSpeculatedKeyValueTable->clear();
2143         }
2144
2145         // Find where to start arbitration from
2146         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2147
2148         if (startIndex >= pendingTransactionQueue->size()) {
2149                 // Make sure we are not out of bounds
2150                 return;
2151         }
2152
2153         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2154                 Transaction *transaction = pendingTransactionQueue->get(i);
2155
2156                 lastPendingTransactionSpeculatedOn = transaction;
2157
2158                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2159                         // Guard evaluated to true so update the speculative table
2160                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2161                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2162                         }
2163                 }
2164         }
2165 }
2166
2167 /**
2168  * Set dead and remove from the live transaction tables the transactions that are dead
2169  */
2170 void Table::updateLiveTransactionsAndStatus() {
2171
2172         // Go through each of the transactions
2173         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2174                 Transaction *transaction = iter->next()->getValue();
2175
2176                 // Check if the transaction is dead
2177                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2178                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2179
2180                         // Set dead the transaction
2181                         transaction->setDead();
2182
2183                         // Remove the transaction from the live table
2184                         iter->remove();
2185                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2186                 }
2187         }
2188
2189         // Go through each of the transactions
2190         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2191                 TransactionStatus *status = iter->next()->getValue();
2192
2193                 // Check if the transaction is dead
2194                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2195                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2196
2197                         // Set committed
2198                         status->setStatus(TransactionStatus_StatusCommitted);
2199
2200                         // Remove
2201                         iter->remove();
2202                 }
2203         }
2204 }
2205
2206 /**
2207  * Process this slot, entry by entry->  Also update the latest message sent by slot
2208  */
2209 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2210
2211         // Update the last message seen
2212         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2213
2214         // Process each entry in the slot
2215         for (Entry *entry : slot->getEntries()) {
2216                 switch (entry->getType()) {
2217
2218                 case TypeCommitPart:
2219                         processEntry((CommitPart *)entry);
2220                         break;
2221
2222                 case TypeAbort:
2223                         processEntry((Abort *)entry);
2224                         break;
2225
2226                 case TypeTransactionPart:
2227                         processEntry((TransactionPart *)entry);
2228                         break;
2229
2230                 case TypeNewKey:
2231                         processEntry((NewKey *)entry);
2232                         break;
2233
2234                 case TypeLastMessage:
2235                         processEntry((LastMessage *)entry, machineSet);
2236                         break;
2237
2238                 case TypeRejectedMessage:
2239                         processEntry((RejectedMessage *)entry, indexer);
2240                         break;
2241
2242                 case TypeTableStatus:
2243                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2244                         break;
2245
2246                 default:
2247                         throw new Error("Unrecognized type: " + entry->getType());
2248                 }
2249         }
2250 }
2251
2252 /**
2253  * Update the last message that was sent for a machine Id
2254  */
2255 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2256         // Update what the last message received by a machine was
2257         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2258 }
2259
2260 /**
2261  * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2262  */
2263 void Table::processEntry(NewKey *entry) {
2264
2265         // Update the arbitrator table with the new key information
2266         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2267
2268         // Update what the latest live new key is
2269         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2270         if (oldNewKey != NULL) {
2271                 // Delete the old new key messages
2272                 oldNewKey->setDead();
2273         }
2274 }
2275
2276 /**
2277  * Process new table status entries and set dead the old ones as new ones come in->
2278  * keeps track of the largest and smallest table status seen in this current round
2279  * of updating the local copy of the block chain
2280  */
2281 void Table::processEntry(TableStatus entry, int64_t seq) {
2282         int newNumSlots = entry->getMaxSlots();
2283         updateCurrMaxSize(newNumSlots);
2284
2285         initExpectedSize(seq, newNumSlots);
2286
2287         if (liveTableStatus != NULL) {
2288                 // We have a larger table status so the old table status is no int64_ter alive
2289                 liveTableStatus->setDead();
2290         }
2291
2292         // Make this new table status the latest alive table status
2293         liveTableStatus = entry;
2294 }
2295
2296 /**
2297  * Check old messages to see if there is a block chain violation-> Also
2298  */
2299 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2300         int64_t oldSeqNum = entry->getOldSeqNum();
2301         int64_t newSeqNum = entry->getNewSeqNum();
2302         bool isequal = entry->getEqual();
2303         int64_t machineId = entry->getMachineID();
2304         int64_t seq = entry->getSequenceNumber();
2305
2306
2307         // Check if we have messages that were supposed to be rejected in our local block chain
2308         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2309
2310                 // Get the slot
2311                 Slot *slot = indexer->getSlot(seqNum);
2312
2313                 if (slot != NULL) {
2314                         // If we have this slot make sure that it was not supposed to be a rejected slot
2315
2316                         int64_t slotMachineId = slot->getMachineID();
2317                         if (isequal != (slotMachineId == machineId)) {
2318                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2319                         }
2320                 }
2321         }
2322
2323
2324         // Create a list of clients to watch until they see this rejected message entry->
2325         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2326         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2327
2328                 // Machine ID for the last message entry
2329                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2330
2331                 // We've seen it, don't need to continue to watch->  Our next
2332                 // message will implicitly acknowledge it->
2333                 if (lastMessageEntryMachineId == localMachineId) {
2334                         continue;
2335                 }
2336
2337                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2338                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2339
2340                 if (entrySequenceNumber < seq) {
2341
2342                         // Add this rejected message to the set of messages that this machine ID did not see yet
2343                         addWatchVector(lastMessageEntryMachineId, entry);
2344
2345                         // This client did not see this rejected message yet so add it to the watch set to monitor
2346                         deviceWatchSet->add(lastMessageEntryMachineId);
2347                 }
2348         }
2349
2350         if (deviceWatchSet->isEmpty()) {
2351                 // This rejected message has been seen by all the clients so
2352                 entry->setDead();
2353         } else {
2354                 // We need to watch this rejected message
2355                 entry->setWatchSet(deviceWatchSet);
2356         }
2357 }
2358
2359 /**
2360  * Check if this abort is live, if not then save it so we can kill it later->
2361  * update the last transaction number that was arbitrated on->
2362  */
2363 void Table::processEntry(Abort *entry) {
2364
2365
2366         if (entry->getTransactionSequenceNumber() != -1) {
2367                 // update the transaction status if it was sent to the server
2368                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2369                 if (status != NULL) {
2370                         status->setStatus(TransactionStatus_StatusAborted);
2371                 }
2372         }
2373
2374         // Abort has not been seen by the client it is for yet so we need to keep track of it
2375         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2376         if (previouslySeenAbort != NULL) {
2377                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2378         }
2379
2380         if (entry->getTransactionArbitrator() == localMachineId) {
2381                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2382         }
2383
2384         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2385
2386                 // The machine already saw this so it is dead
2387                 entry->setDead();
2388                 liveAbortTable->remove(entry->getAbortId());
2389
2390                 if (entry->getTransactionArbitrator() == localMachineId) {
2391                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2392                 }
2393
2394                 return;
2395         }
2396
2397         // Update the last arbitration data that we have seen so far
2398         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2399
2400                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2401                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2402                         // Is larger
2403                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2404                 }
2405         } else {
2406                 // Never seen any data from this arbitrator so record the first one
2407                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2408         }
2409
2410
2411         // Set dead a transaction if we can
2412         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2413         if (transactionToSetDead != NULL) {
2414                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2415         }
2416
2417         // Update the last transaction sequence number that the arbitrator arbitrated on
2418         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2419         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2420
2421                 // Is a valid one
2422                 if (entry->getTransactionSequenceNumber() != -1) {
2423                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2424                 }
2425         }
2426 }
2427
2428 /**
2429  * Set dead the transaction part if that transaction is dead and keep track of all new parts
2430  */
2431 void Table::processEntry(TransactionPart *entry) {
2432         // Check if we have already seen this transaction and set it dead OR if it is not alive
2433         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2434         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2435                 // This transaction is dead, it was already committed or aborted
2436                 entry->setDead();
2437                 return;
2438         }
2439
2440         // This part is still alive
2441         Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2442
2443         if (transactionPart == NULL) {
2444                 // Dont have a table for this machine Id yet so make one
2445                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2446                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2447         }
2448
2449         // Update the part and set dead ones we have already seen (got a rescued version)
2450         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2451         if (previouslySeenPart != NULL) {
2452                 previouslySeenPart->setDead();
2453         }
2454 }
2455
2456 /**
2457  * Process new commit entries and save them for future use->  Delete duplicates
2458  */
2459 void Table::processEntry(CommitPart *entry) {
2460         // Update the last transaction that was updated if we can
2461         if (entry->getTransactionSequenceNumber() != -1) {
2462                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2463
2464                 // Update the last transaction sequence number that the arbitrator arbitrated on
2465                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2466                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2467                 }
2468         }
2469
2470
2471
2472
2473         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2474
2475         if (commitPart == NULL) {
2476                 // Don't have a table for this machine Id yet so make one
2477                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2478                 newCommitParts->put(entry->getMachineId(), commitPart);
2479         }
2480
2481         // Update the part and set dead ones we have already seen (got a rescued version)
2482         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2483         if (previouslySeenPart != NULL) {
2484                 previouslySeenPart->setDead();
2485         }
2486 }
2487
2488 /**
2489  * Update the last message seen table->  Update and set dead the appropriate RejectedMessages as clients see them->
2490  * Updates the live aborts, removes those that are dead and sets them dead->
2491  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2492  * other clients have not had a rollback on the last message->
2493  */
2494 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2495
2496         // We have seen this machine ID
2497         machineSet->remove(machineId);
2498
2499         // Get the set of rejected messages that this machine Id is has not seen yet
2500         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2501
2502         // If there is a rejected message that this machine Id has not seen yet
2503         if (watchset != NULL) {
2504
2505                 // Go through each rejected message that this machine Id has not seen yet
2506                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2507
2508                         RejectedMessage *rm = rmit->next();
2509
2510                         // If this machine Id has seen this rejected message->->->
2511                         if (rm->getSequenceNumber() <= seqNum) {
2512
2513                                 // Remove it from our watchlist
2514                                 rmit->remove();
2515
2516                                 // Decrement machines that need to see this notification
2517                                 rm->removeWatcher(machineId);
2518                         }
2519                 }
2520         }
2521
2522         // Set dead the abort
2523         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2524                 Abort *abort = i->next()->getValue();
2525
2526                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2527                         abort->setDead();
2528                         i->remove();
2529
2530                         if (abort->getTransactionArbitrator() == localMachineId) {
2531                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2532                         }
2533                 }
2534         }
2535
2536
2537
2538         if (machineId == localMachineId) {
2539                 // Our own messages are immediately dead->
2540                 if (liveness instanceof LastMessage) {
2541                         ((LastMessage *)liveness)->setDead();
2542                 } else if (liveness instanceof Slot) {
2543                         ((Slot *)liveness)->setDead();
2544                 } else {
2545                         throw new Error("Unrecognized type");
2546                 }
2547         }
2548
2549         // Get the old last message for this device
2550         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2551         if (lastMessageEntry == NULL) {
2552                 // If no last message then there is nothing else to process
2553                 return;
2554         }
2555
2556         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2557         Liveness *lastEntry = lastMessageEntry->getSecond();
2558
2559         // If it is not our machine Id since we already set ours to dead
2560         if (machineId != localMachineId) {
2561                 if (lastEntry instanceof LastMessage) {
2562                         ((LastMessage *)lastEntry)->setDead();
2563                 } else if (lastEntry instanceof Slot) {
2564                         ((Slot *)lastEntry)->setDead();
2565                 } else {
2566                         throw new Error("Unrecognized type");
2567                 }
2568         }
2569
2570         // Make sure the server is not playing any games
2571         if (machineId == localMachineId) {
2572
2573                 if (hadPartialSendToServer) {
2574                         // We were not making any updates and we had a machine mismatch
2575                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2576                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2577                         }
2578
2579                 } else {
2580                         // We were not making any updates and we had a machine mismatch
2581                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2582                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2583                         }
2584                 }
2585         } else {
2586                 if (lastMessageSeqNum > seqNum) {
2587                         throw new Error("Server Error: Rollback on remote machine sequence number");
2588                 }
2589         }
2590 }
2591
2592 /**
2593  * Add a rejected message entry to the watch set to keep track of which clients have seen that
2594  * rejected message entry and which have not->
2595  */
2596 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2597         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2598         if (entries == NULL) {
2599                 // There is no set for this machine ID yet so create one
2600                 entries = new Hashset<RejectedMessage *>();
2601                 rejectedMessageWatchVectorTable->put(machineId, entries);
2602         }
2603         entries->add(entry);
2604 }
2605
2606 /**
2607  * Check if the HMAC chain is not violated
2608  */
2609 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2610         for (int i = 0; i < newSlots->length; i++) {
2611                 Slot *currSlot = newSlots[i];
2612                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2613                 if (prevSlot != NULL &&
2614                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2615                         throw new Error("Server Error: Invalid HMAC Chain");
2616         }
2617 }
2618