edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "Random.h"
13
14 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
15         buffer(NULL),
16         cloud(new CloudComm(this, baseurl, password, listeningPort)),
17         random(NULL),
18         liveTableStatus(NULL),
19         pendingTransactionBuilder(NULL),
20         lastPendingTransactionSpeculatedOn(NULL),
21         firstPendingTransaction(NULL),
22         numberOfSlots(0),
23         bufferResizeThreshold(0),
24         liveSlotCount(0),
25         oldestLiveSlotSequenceNumver(1),
26         localMachineId(_localMachineId),
27         sequenceNumber(0),
28         localTransactionSequenceNumber(0),
29         lastTransactionSequenceNumberSpeculatedOn(0),
30         oldestTransactionSequenceNumberSpeculatedOn(0),
31         localArbitrationSequenceNumber(0),
32         hadPartialSendToServer(false),
33         attemptedToSendToServer(false),
34         expectedsize(0),
35         didFindTableStatus(false),
36         currMaxSize(0),
37         lastSlotAttemptedToSend(NULL),
38         lastIsNewKey(false),
39         lastNewSize(0),
40         lastTransactionPartsSent(NULL),
41         lastPendingSendArbitrationEntriesToDelete(NULL),
42         lastNewKey(NULL),
43         committedKeyValueTable(NULL),
44         speculatedKeyValueTable(NULL),
45         pendingTransactionSpeculatedKeyValueTable(NULL),
46         liveNewKeyTable(NULL),
47         lastMessageTable(NULL),
48         rejectedMessageWatchVectorTable(NULL),
49         arbitratorTable(NULL),
50         liveAbortTable(NULL),
51         newTransactionParts(NULL),
52         newCommitParts(NULL),
53         lastArbitratedTransactionNumberByArbitratorTable(NULL),
54         liveTransactionBySequenceNumberTable(NULL),
55         liveTransactionByTransactionIdTable(NULL),
56         liveCommitsTable(NULL),
57         liveCommitsByKeyTable(NULL),
58         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
59         rejectedSlotVector(NULL),
60         pendingTransactionQueue(NULL),
61         pendingSendArbitrationRounds(NULL),
62         pendingSendArbitrationEntriesToDelete(NULL),
63         transactionPartsSent(NULL),
64         outstandingTransactionStatus(NULL),
65         liveAbortsGeneratedByLocal(NULL),
66         offlineTransactionsCommittedAndAtServer(NULL),
67         localCommunicationTable(NULL),
68         lastTransactionSeenFromMachineFromServer(NULL),
69         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
70         lastInsertedNewKey(false),
71         lastSeqNumArbOn(0)
72 {
73         init();
74 }
75
76 Table::Table(CloudComm * _cloud, int64_t _localMachineId) :
77         buffer(NULL),
78         cloud(_cloud),
79         random(NULL),
80         liveTableStatus(NULL),
81         pendingTransactionBuilder(NULL),
82         lastPendingTransactionSpeculatedOn(NULL),
83         firstPendingTransaction(NULL),
84         numberOfSlots(0),
85         bufferResizeThreshold(0),
86         liveSlotCount(0),
87         oldestLiveSlotSequenceNumver(1),
88         localMachineId(_localMachineId),
89         sequenceNumber(0),
90         localTransactionSequenceNumber(0),
91         lastTransactionSequenceNumberSpeculatedOn(0),
92         oldestTransactionSequenceNumberSpeculatedOn(0),
93         localArbitrationSequenceNumber(0),
94         hadPartialSendToServer(false),
95         attemptedToSendToServer(false),
96         expectedsize(0),
97         didFindTableStatus(false),
98         currMaxSize(0),
99         lastSlotAttemptedToSend(NULL),
100         lastIsNewKey(false),
101         lastNewSize(0),
102         lastTransactionPartsSent(NULL),
103         lastPendingSendArbitrationEntriesToDelete(NULL),
104         lastNewKey(NULL),
105         committedKeyValueTable(NULL),
106         speculatedKeyValueTable(NULL),
107         pendingTransactionSpeculatedKeyValueTable(NULL),
108         liveNewKeyTable(NULL),
109         lastMessageTable(NULL),
110         rejectedMessageWatchVectorTable(NULL),
111         arbitratorTable(NULL),
112         liveAbortTable(NULL),
113         newTransactionParts(NULL),
114         newCommitParts(NULL),
115         lastArbitratedTransactionNumberByArbitratorTable(NULL),
116         liveTransactionBySequenceNumberTable(NULL),
117         liveTransactionByTransactionIdTable(NULL),
118         liveCommitsTable(NULL),
119         liveCommitsByKeyTable(NULL),
120         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
121         rejectedSlotVector(NULL),
122         pendingTransactionQueue(NULL),
123         pendingSendArbitrationRounds(NULL),
124         pendingSendArbitrationEntriesToDelete(NULL),
125         transactionPartsSent(NULL),
126         outstandingTransactionStatus(NULL),
127         liveAbortsGeneratedByLocal(NULL),
128         offlineTransactionsCommittedAndAtServer(NULL),
129         localCommunicationTable(NULL),
130         lastTransactionSeenFromMachineFromServer(NULL),
131         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
132         lastInsertedNewKey(false),
133         lastSeqNumArbOn(0)
134 {
135         init();
136 }
137
138 /**
139  * Init all the stuff needed for for table usage
140  */
141 void Table::init() {
142         // Init helper objects
143         random = new Random();
144         buffer = new SlotBuffer();
145
146         // init data structs
147         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
148         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
149         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150         liveNewKeyTable = new Hashtable<IoTString *, NewKey*>();
151         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness*> *>();
152         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage*>* >();
153         arbitratorTable = new Hashtable<IoTString *, int64_t>();
154         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>*, Abort*>();
155         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*> *>();
156         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*> *>();
157         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
158         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction*>();
159         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>*, Transaction*>();
160         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit*> >();
161         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit*>();
162         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
163         rejectedSlotVector = new Vector<int64_t>();
164         pendingTransactionQueue = new Vector<Transaction*>();
165         pendingSendArbitrationEntriesToDelete = new Vector<Entry*>();
166         transactionPartsSent = new Hashtable<Transaction*, Vector<int32_t> *>();
167         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus*>();
168         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort*>();
169         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>*>();
170         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString*, int32_t>*>();
171         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
172         pendingSendArbitrationRounds = new Vector<ArbitrationRound*>();
173         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
174
175
176         // Other init stuff
177         numberOfSlots = buffer->capacity();
178         setResizeThreshold();
179 }
180
181 /**
182  * Initialize the table by inserting a table status as the first entry into the table status
183  * also initialize the crypto stuff.
184  */
185  void Table::initTable() {
186         cloud->initSecurity();
187
188         // Create the first insertion into the block chain which is the table status
189         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
190         localSequenceNumber++;
191         TableStatus *status = new TableStatus(s, numberOfSlots);
192         s->addEntry(status);
193         Array<Slot*> *array = cloud->putSlot(s, numberOfSlots);
194
195         if (array == NULL) {
196                 array = new Array<Slot*>(1);
197                 array->set(0, s);
198                 // update local block chain
199                 validateAndUpdate(array, true);
200         } else if (array->length() == 1) {
201                 // in case we did push the slot BUT we failed to init it
202                 validateAndUpdate(array, true);
203         } else {
204                 throw new Error("Error on initialization");
205         }
206 }
207
208 /**
209  * Rebuild the table from scratch by pulling the latest block chain from the server.
210  */
211  void Table::rebuild() {
212         // Just pull the latest slots from the server
213         Array<Slot*> *newslots = cloud->getSlots(sequenceNumber + 1);
214         validateAndUpdate(newslots, true);
215         sendToServer(NULL);
216         updateLiveTransactionsAndStatus();
217 }
218
219  void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
220         localCommunicationTable->put(arbitrator, new Pair<IoTString*, int32_t>(hostName, portNumber));
221 }
222
223  int64_t Table::getArbitrator(IoTString *key) {
224         return arbitratorTable->get(key);
225 }
226
227  void Table::close() {
228         cloud->close();
229 }
230
231  IoTString * Table::getCommitted(IoTString *key)  {
232         KeyValue *kv = committedKeyValueTable->get(key);
233
234         if (kv != NULL) {
235                 return kv->getValue();
236         } else {
237                 return NULL;
238         }
239 }
240
241  IoTString * Table::getSpeculative(IoTString *key) {
242         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
243
244         if (kv == NULL) {
245                 kv = speculatedKeyValueTable->get(key);
246         }
247
248         if (kv == NULL) {
249                 kv = committedKeyValueTable->get(key);
250         }
251
252         if (kv != NULL) {
253                 return kv->getValue();
254         } else {
255                 return NULL;
256         }
257 }
258
259  IoTString * Table::getCommittedAtomic(IoTString *key) {
260         KeyValue *kv = committedKeyValueTable->get(key);
261
262         if (arbitratorTable->get(key) == NULL) {
263                 throw new Error("Key not Found.");
264         }
265
266         // Make sure new key value pair matches the current arbitrator
267         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
268                 // TODO: Maybe not throw en error
269                 throw new Error("Not all Key Values Match Arbitrator.");
270         }
271
272         if (kv != NULL) {
273                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
274                 return kv->getValue();
275         } else {
276                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
277                 return NULL;
278         }
279 }
280
281  IoTString * Table::getSpeculativeAtomic(IoTString *key) {
282         if (arbitratorTable->get(key) == NULL) {
283                 throw new Error("Key not Found.");
284         }
285
286         // Make sure new key value pair matches the current arbitrator
287         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
288                 // TODO: Maybe not throw en error
289                 throw new Error("Not all Key Values Match Arbitrator.");
290         }
291
292         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
293
294         if (kv == NULL) {
295                 kv = speculatedKeyValueTable->get(key);
296         }
297
298         if (kv == NULL) {
299                 kv = committedKeyValueTable->get(key);
300         }
301
302         if (kv != NULL) {
303                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
304                 return kv->getValue();
305         } else {
306                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
307                 return NULL;
308         }
309 }
310
311  bool Table::update()  {
312         try {
313                 Array<Slot*> *newSlots = cloud->getSlots(sequenceNumber + 1);
314                 validateAndUpdate(newSlots, false);
315                 sendToServer(NULL);
316
317
318                 updateLiveTransactionsAndStatus();
319
320                 return true;
321         } catch (Exception *e) {
322                 // e->printStackTrace();
323
324                 for (int64_t m : localCommunicationTable->keySet()) {
325                         updateFromLocal(m);
326                 }
327         }
328
329         return false;
330 }
331
332  bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
333         while (true) {
334                 if (arbitratorTable->get(keyName) != NULL) {
335                         // There is already an arbitrator
336                         return false;
337                 }
338
339                 NewKey * newKey = new NewKey(NULL, keyName, machineId);
340
341                 if (sendToServer(newKey)) {
342                         // If successfully inserted
343                         return true;
344                 }
345         }
346 }
347
348  void Table::startTransaction() {
349         // Create a new transaction, invalidates any old pending transactions.
350         pendingTransactionBuilder = new PendingTransaction(localMachineId);
351 }
352
353  void Table::addKV(IoTString *key, IoTString *value) {
354
355         // Make sure it is a valid key
356         if (arbitratorTable->get(key) == NULL) {
357                 throw new Error("Key not Found.");
358         }
359
360         // Make sure new key value pair matches the current arbitrator
361         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
362                 // TODO: Maybe not throw en error
363                 throw new Error("Not all Key Values Match Arbitrator.");
364         }
365
366         // Add the key value to this transaction
367         KeyValue *kv = new KeyValue(key, value);
368         pendingTransactionBuilder->addKV(kv);
369 }
370
371  TransactionStatus Table::commitTransaction() {
372
373         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
374                 // transaction with no updates will have no effect on the system
375                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
376         }
377
378         // Set the local transaction sequence number and increment
379         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
380         localTransactionSequenceNumber++;
381
382         // Create the transaction status
383         TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
384
385         // Create the new transaction
386         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
387         newTransaction->setTransactionStatus(transactionStatus);
388
389         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
390                 // Add it to the queue and invalidate the builder for safety
391                 pendingTransactionQueue->add(newTransaction);
392         } else {
393                 arbitrateOnLocalTransaction(newTransaction);
394                 updateLiveStateFromLocal();
395         }
396
397         pendingTransactionBuilder = new PendingTransaction(localMachineId);
398
399         try {
400                 sendToServer(NULL);
401         } catch (ServerException *e) {
402
403                 Hashset<int64_t>* arbitratorTriedAndFailed = new Hashset<int64_t>();
404                 for (Iterator<Transaction*> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
405                         Transaction * transaction = iter->next();
406
407                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
408                                 // Already contacted this client so ignore all attempts to contact this client
409                                 // to preserve ordering for arbitrator
410                                 continue;
411                         }
412
413                         Pair<bool, bool> * sendReturn = sendTransactionToLocal(transaction);
414
415                         if (sendReturn->getFirst()) {
416                                 // Failed to contact over local
417                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
418                         } else {
419                                 // Successful contact or should not contact
420
421                                 if (sendReturn->getSecond()) {
422                                         // did arbitrate
423                                         iter->remove();
424                                 }
425                         }
426                 }
427         }
428
429         updateLiveStateFromLocal();
430
431         return transactionStatus;
432 }
433
434 /**
435  * Recalculate the new resize threshold
436  */
437 void Table::setResizeThreshold() {
438         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
439         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
440 }
441
442 int64_t Table::getLocalSequenceNumber() {
443         return localSequenceNumber;
444 }
445
446
447 bool lastInsertedNewKey = false;
448
449 bool Table::sendToServer(NewKey* newKey) {
450
451         bool fromRetry = false;
452
453         try {
454                 if (hadPartialSendToServer) {
455                         Array<Slot*> *newSlots = cloud->getSlots(sequenceNumber + 1);
456                         if (newSlots->length() == 0) {
457                                 fromRetry = true;
458                                 ThreeTuple<bool, bool, Array<Slot*> *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
459
460                                 if (sendSlotsReturn->getFirst()) {
461                                         if (newKey != NULL) {
462                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
463                                                         newKey = NULL;
464                                                 }
465                                         }
466
467                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
468                                                 transaction->resetServerFailure();
469
470                                                 // Update which transactions parts still need to be sent
471                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
472
473                                                 // Add the transaction status to the outstanding list
474                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
475
476                                                 // Update the transaction status
477                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
478
479                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
480                                                 if (transaction->didSendAllParts()) {
481                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
482                                                         pendingTransactionQueue->remove(transaction);
483                                                 }
484                                         }
485                                 } else {
486
487                                         newSlots = sendSlotsReturn->getThird();
488
489                                         bool isInserted = false;
490                                         for (Slot *s : newSlots) {
491                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
492                                                         isInserted = true;
493                                                         break;
494                                                 }
495                                         }
496
497                                         for (Slot *s : newSlots) {
498                                                 if (isInserted) {
499                                                         break;
500                                                 }
501
502                                                 // Process each entry in the slot
503                                                 for (Entry *entry : s->getEntries()) {
504
505                                                         if (entry->getType() == TypeLastMessage) {
506                                                                 LastMessage lastMessage = (LastMessage)entry;
507                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
508                                                                         isInserted = true;
509                                                                         break;
510                                                                 }
511                                                         }
512                                                 }
513                                         }
514
515                                         if (isInserted) {
516                                                 if (newKey != NULL) {
517                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
518                                                                 newKey = NULL;
519                                                         }
520                                                 }
521
522                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
523                                                         transaction->resetServerFailure();
524
525                                                         // Update which transactions parts still need to be sent
526                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
527
528                                                         // Add the transaction status to the outstanding list
529                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
530
531                                                         // Update the transaction status
532                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
533
534                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
535                                                         if (transaction->didSendAllParts()) {
536                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
537                                                                 pendingTransactionQueue->remove(transaction);
538                                                         } else {
539                                                                 transaction->resetServerFailure();
540                                                                 // Set the transaction sequence number back to nothing
541                                                                 if (!transaction->didSendAPartToServer()) {
542                                                                         transaction->setSequenceNumber(-1);
543                                                                 }
544                                                         }
545                                                 }
546                                         }
547                                 }
548
549                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
550                                         transaction->resetServerFailure();
551                                         // Set the transaction sequence number back to nothing
552                                         if (!transaction->didSendAPartToServer()) {
553                                                 transaction->setSequenceNumber(-1);
554                                         }
555                                 }
556
557                                 if (sendSlotsReturn->getThird()->length() != 0) {
558                                         // insert into the local block chain
559                                         validateAndUpdate(sendSlotsReturn->getThird(), true);
560                                 }
561                                 // continue;
562                         } else {
563                                 bool isInserted = false;
564                                 for (Slot *s : newSlots) {
565                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
566                                                 isInserted = true;
567                                                 break;
568                                         }
569                                 }
570
571                                 for (Slot *s : newSlots) {
572                                         if (isInserted) {
573                                                 break;
574                                         }
575
576                                         // Process each entry in the slot
577                                         for (Entry *entry : s->getEntries()) {
578
579                                                 if (entry->getType() == TypeLastMessage) {
580                                                         LastMessage lastMessage = (LastMessage)entry;
581                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
582                                                                 isInserted = true;
583                                                                 break;
584                                                         }
585                                                 }
586                                         }
587                                 }
588
589                                 if (isInserted) {
590                                         if (newKey != NULL) {
591                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
592                                                         newKey = NULL;
593                                                 }
594                                         }
595
596                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
597                                                 transaction->resetServerFailure();
598
599                                                 // Update which transactions parts still need to be sent
600                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
601
602                                                 // Add the transaction status to the outstanding list
603                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
604
605                                                 // Update the transaction status
606                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
607
608                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
609                                                 if (transaction->didSendAllParts()) {
610                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
611                                                         pendingTransactionQueue->remove(transaction);
612                                                 } else {
613                                                         transaction->resetServerFailure();
614                                                         // Set the transaction sequence number back to nothing
615                                                         if (!transaction->didSendAPartToServer()) {
616                                                                 transaction->setSequenceNumber(-1);
617                                                         }
618                                                 }
619                                         }
620                                 } else {
621                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
622                                                 transaction->resetServerFailure();
623                                                 // Set the transaction sequence number back to nothing
624                                                 if (!transaction->didSendAPartToServer()) {
625                                                         transaction->setSequenceNumber(-1);
626                                                 }
627                                         }
628                                 }
629
630                                 // insert into the local block chain
631                                 validateAndUpdate(newSlots, true);
632                         }
633                 }
634         } catch (ServerException *e) {
635                 throw e;
636         }
637
638
639
640         try {
641                 // While we have stuff that needs inserting into the block chain
642                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
643
644                         fromRetry = false;
645
646                         if (hadPartialSendToServer) {
647                                 throw new Error("Should Be error free");
648                         }
649
650
651
652                         // If there is a new key with same name then end
653                         if ((newKey != NULL) && (arbitratorTable->get(newKey->getKey()) != NULL)) {
654                                 return false;
655                         }
656
657                         // Create the slot
658                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
659                         localSequenceNumber++;
660
661                         // Try to fill the slot with data
662                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
663                         bool needsResize = fillSlotsReturn->getFirst();
664                         int newSize = fillSlotsReturn->getSecond();
665                         bool insertedNewKey = fillSlotsReturn->getThird();
666
667                         if (needsResize) {
668                                 // Reset which transaction to send
669                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
670                                         transaction->resetNextPartToSend();
671
672                                         // Set the transaction sequence number back to nothing
673                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
674                                                 transaction->setSequenceNumber(-1);
675                                         }
676                                 }
677
678                                 // Clear the sent data since we are trying again
679                                 pendingSendArbitrationEntriesToDelete->clear();
680                                 transactionPartsSent->clear();
681
682                                 // We needed a resize so try again
683                                 fillSlot(slot, true, newKey);
684                         }
685
686                         lastSlotAttemptedToSend = slot;
687                         lastIsNewKey = (newKey != NULL);
688                         lastInsertedNewKey = insertedNewKey;
689                         lastNewSize = newSize;
690                         lastNewKey = newKey;
691                         lastTransactionPartsSent = new Hashtable<Transaction*, Vector<int32_t>* >(transactionPartsSent);
692                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry*>(pendingSendArbitrationEntriesToDelete);
693
694
695                         ThreeTuple<bool, bool, Array<Slot*> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
696
697                         if (sendSlotsReturn->getFirst()) {
698
699                                 // Did insert into the block chain
700
701                                 if (insertedNewKey) {
702                                         // This slot was what was inserted not a previous slot
703
704                                         // New Key was successfully inserted into the block chain so dont want to insert it again
705                                         newKey = NULL;
706                                 }
707
708                                 // Remove the aborts and commit parts that were sent from the pending to send queue
709                                 for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
710                                         ArbitrationRound round = iter->next();
711                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
712
713                                         if (round->isDoneSending()) {
714                                                 // Sent all the parts
715                                                 iter->remove();
716                                         }
717                                 }
718
719                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
720                                         transaction->resetServerFailure();
721
722                                         // Update which transactions parts still need to be sent
723                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
724
725                                         // Add the transaction status to the outstanding list
726                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
727
728                                         // Update the transaction status
729                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
730
731                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
732                                         if (transaction->didSendAllParts()) {
733                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
734                                                 pendingTransactionQueue->remove(transaction);
735                                         }
736                                 }
737                         } else {
738                                 // Reset which transaction to send
739                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
740                                         transaction->resetNextPartToSend();
741
742                                         // Set the transaction sequence number back to nothing
743                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
744                                                 transaction->setSequenceNumber(-1);
745                                         }
746                                 }
747                         }
748
749                         // Clear the sent data in preparation for next send
750                         pendingSendArbitrationEntriesToDelete->clear();
751                         transactionPartsSent->clear();
752
753                         if (sendSlotsReturn->getThird()->length() != 0) {
754                                 // insert into the local block chain
755                                 validateAndUpdate(sendSlotsReturn->getThird(), true);
756                         }
757                 }
758
759         } catch (ServerException *e) {
760
761                 if (e->getType() != ServerException->TypeInputTimeout) {
762                         // Nothing was able to be sent to the server so just clear these data structures
763                         for (Transaction *transaction : transactionPartsSent->keySet()) {
764                                 transaction->resetNextPartToSend();
765
766                                 // Set the transaction sequence number back to nothing
767                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
768                                         transaction->setSequenceNumber(-1);
769                                 }
770                         }
771                 } else {
772                         // There was a partial send to the server
773                         hadPartialSendToServer = true;
774
775                         // Nothing was able to be sent to the server so just clear these data structures
776                         for (Transaction *transaction : transactionPartsSent->keySet()) {
777                                 transaction->resetNextPartToSend();
778                                 transaction->setServerFailure();
779                         }
780                 }
781
782                 pendingSendArbitrationEntriesToDelete->clear();
783                 transactionPartsSent->clear();
784
785                 throw e;
786         }
787
788         return newKey == NULL;
789 }
790
791  bool Table::updateFromLocal(int64_t machineId) {
792         Pair<IoTString*, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
793         if (localCommunicationInformation == NULL) {
794                 // Cant talk to that device locally so do nothing
795                 return false;
796         }
797
798         // Get the size of the send data
799         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
800
801         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
802         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
803                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
804         }
805
806         Array<char> *sendData = new Array<char>(sendDataSize);
807         ByteBuffer * bbEncode = ByteBuffer_wrap(sendData);
808
809         // Encode the data
810         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
811         bbEncode->putInt(0);
812
813         // Send by local
814         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
815         localSequenceNumber++;
816
817         if (returnData == NULL) {
818                 // Could not contact server
819                 return false;
820         }
821
822         // Decode the data
823         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
824         int numberOfEntries = bbDecode->getInt();
825
826         for (int i = 0; i < numberOfEntries; i++) {
827                 char type = bbDecode->get();
828                 if (type == TypeAbort) {
829                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
830                         processEntry(abort);
831                 } else if (type == TypeCommitPart) {
832                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
833                         processEntry(commitPart);
834                 }
835         }
836
837         updateLiveStateFromLocal();
838
839         return true;
840 }
841
842 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
843
844         // Get the devices local communications
845         Pair<IoTString*, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
846
847         if (localCommunicationInformation == NULL) {
848                 // Cant talk to that device locally so do nothing
849                 return new Pair<bool, bool>(true, false);
850         }
851
852         // Get the size of the send data
853         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854         for (TransactionPart *part : transaction->getParts()->values()) {
855                 sendDataSize += part->getSize();
856         }
857
858         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
859         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
860                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
861         }
862
863         // Make the send data size
864         Array<char> *sendData = new Array<char>(sendDataSize);
865         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
866
867         // Encode the data
868         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
869         bbEncode->putInt(transaction->getParts()->size());
870         for (TransactionPart *part : transaction->getParts()->values()) {
871                 part->encode(bbEncode);
872         }
873
874
875         // Send by local
876         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
877         localSequenceNumber++;
878
879         if (returnData == NULL) {
880                 // Could not contact server
881                 return new Pair<bool, bool>(true, false);
882         }
883
884         // Decode the data
885         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
886         bool didCommit = bbDecode->get() == 1;
887         bool couldArbitrate = bbDecode->get() == 1;
888         int numberOfEntries = bbDecode->getInt();
889         bool foundAbort = false;
890
891         for (int i = 0; i < numberOfEntries; i++) {
892                 char type = bbDecode->get();
893                 if (type == TypeAbort) {
894                         Abort abort = (Abort)Abort_decode(NULL, bbDecode);
895
896                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
897                                 foundAbort = true;
898                         }
899
900                         processEntry(abort);
901                 } else if (type == TypeCommitPart) {
902                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
903                         processEntry(commitPart);
904                 }
905         }
906
907         updateLiveStateFromLocal();
908
909         if (couldArbitrate) {
910                 TransactionStatus status =  transaction->getTransactionStatus();
911                 if (didCommit) {
912                         status->setStatus(TransactionStatus_StatusCommitted);
913                 } else {
914                         status->setStatus(TransactionStatus_StatusAborted);
915                 }
916         } else {
917                 TransactionStatus status =  transaction->getTransactionStatus();
918                 if (foundAbort) {
919                         status->setStatus(TransactionStatus_StatusAborted);
920                 } else {
921                         status->setStatus(TransactionStatus_StatusCommitted);
922                 }
923         }
924
925         return new Pair<bool, bool>(false, true);
926 }
927
928  Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
929
930         // Decode the data
931         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
932         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
933         int numberOfParts = bbDecode->getInt();
934
935         // If we did commit a transaction or not
936         bool didCommit = false;
937         bool couldArbitrate = false;
938
939         if (numberOfParts != 0) {
940
941                 // decode the transaction
942                 Transaction *transaction = new Transaction();
943                 for (int i = 0; i < numberOfParts; i++) {
944                         bbDecode->get();
945                         TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
946                         transaction->addPartDecode(newPart);
947                 }
948
949                 // Arbitrate on transaction and pull relevant return data
950                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
951                 couldArbitrate = localArbitrateReturn->getFirst();
952                 didCommit = localArbitrateReturn->getSecond();
953
954                 updateLiveStateFromLocal();
955
956                 // Transaction was sent to the server so keep track of it to prevent double commit
957                 if (transaction->getSequenceNumber() != -1) {
958                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
959                 }
960         }
961
962         // The data to send back
963         int returnDataSize = 0;
964         Vector<Entry*> *unseenArbitrations = new Vector<Entry*>();
965
966         // Get the aborts to send back
967         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
968         Collections->sort(abortLocalSequenceNumbers);
969         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
970                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
971                         continue;
972                 }
973
974                 Abort abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
975                 unseenArbitrations->add(abort);
976                 returnDataSize += abort->getSize();
977         }
978
979         // Get the commits to send back
980         Hashtable<int64_t, Commit*>* commitForClientTable = liveCommitsTable->get(localMachineId);
981         if (commitForClientTable != NULL) {
982                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
983                 Collections->sort(commitLocalSequenceNumbers);
984
985                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
986                         Commit commit = commitForClientTable->get(localSequenceNumber);
987
988                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
989                                 continue;
990                         }
991
992                         unseenArbitrations->addAll(commit->getParts()->values());
993
994                         for (CommitPart commitPart : commit->getParts()->values()) {
995                                 returnDataSize += commitPart->getSize();
996                         }
997                 }
998         }
999
1000         // Number of arbitration entries to decode
1001         returnDataSize += 2 * sizeof(int32_t);
1002
1003         // bool of did commit or not
1004         if (numberOfParts != 0) {
1005                 returnDataSize += sizeof(char);
1006         }
1007
1008         // Data to send Back
1009         Array<char> *returnData = new Array<char>(returnDataSize);
1010         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1011
1012         if (numberOfParts != 0) {
1013                 if (didCommit) {
1014                         bbEncode->put((char)1);
1015                 } else {
1016                         bbEncode->put((char)0);
1017                 }
1018                 if (couldArbitrate) {
1019                         bbEncode->put((char)1);
1020                 } else {
1021                         bbEncode->put((char)0);
1022                 }
1023         }
1024
1025         bbEncode->putInt(unseenArbitrations->size());
1026         for (Entry *entry : unseenArbitrations) {
1027                 entry->encode(bbEncode);
1028         }
1029
1030
1031         localSequenceNumber++;
1032         return returnData;
1033 }
1034
1035 ThreeTuple<bool, bool, Array<Slot*> *> * Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1036         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1037         attemptedToSendToServer = true;
1038
1039         bool inserted = false;
1040         bool lastTryInserted = false;
1041
1042         Array<Slot*> *array = cloud->putSlot(slot, newSize);
1043         if (array == NULL) {
1044                 array = new Array<Slot*>();
1045                 array->set(0, slot);
1046                 rejectedSlotVector->clear();
1047                 inserted = true;
1048         } else {
1049                 if (array->length() == 0) {
1050                         throw new Error("Server Error: Did not send any slots");
1051                 }
1052
1053                 // if (attemptedToSendToServerTmp) {
1054                 if (hadPartialSendToServer) {
1055
1056                         bool isInserted = false;
1057                         for (Slot *s : array) {
1058                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1059                                         isInserted = true;
1060                                         break;
1061                                 }
1062                         }
1063
1064                         for (Slot *s : array) {
1065                                 if (isInserted) {
1066                                         break;
1067                                 }
1068
1069                                 // Process each entry in the slot
1070                                 for (Entry *entry : s->getEntries()) {
1071
1072                                         if (entry->getType() == TypeLastMessage) {
1073                                                 LastMessage lastMessage = (LastMessage)entry;
1074
1075                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1076                                                         isInserted = true;
1077                                                         break;
1078                                                 }
1079                                         }
1080                                 }
1081                         }
1082
1083                         if (!isInserted) {
1084                                 rejectedSlotVector->add(slot->getSequenceNumber());
1085                                 lastTryInserted = false;
1086                         } else {
1087                                 lastTryInserted = true;
1088                         }
1089                 } else {
1090                         rejectedSlotVector->add(slot->getSequenceNumber());
1091                         lastTryInserted = false;
1092                 }
1093         }
1094
1095         return new ThreeTuple<bool, bool, Array<Slot*> *>(inserted, lastTryInserted, array);
1096 }
1097
1098 /**
1099  * Returns false if a resize was needed
1100  */
1101 ThreeTuple<bool, int32_t, bool> *Table::fillSlot(Slot *slot, bool resize, NewKey * newKeyEntry) {
1102
1103
1104         int newSize = 0;
1105         if (liveSlotCount > bufferResizeThreshold) {
1106                 resize = true;  //Resize is forced
1107
1108         }
1109
1110         if (resize) {
1111                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1112                 TableStatus *status = new TableStatus(slot, newSize);
1113                 slot->addEntry(status);
1114         }
1115
1116         // Fill with rejected slots first before doing anything else
1117         doRejectedMessages(slot);
1118
1119         // Do mandatory rescue of entries
1120         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1121
1122         // Extract working variables
1123         bool needsResize = mandatoryRescueReturn->getFirst();
1124         bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1125         int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1126
1127         if (needsResize && !resize) {
1128                 // We need to resize but we are not resizing so return false
1129                 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1130         }
1131
1132         bool inserted = false;
1133         if (newKeyEntry != NULL) {
1134                 newKeyEntry->setSlot(slot);
1135                 if (slot->hasSpace(newKeyEntry)) {
1136
1137                         slot->addEntry(newKeyEntry);
1138                         inserted = true;
1139                 }
1140         }
1141
1142         // Clear the transactions, aborts and commits that were sent previously
1143         transactionPartsSent->clear();
1144         pendingSendArbitrationEntriesToDelete->clear();
1145
1146         for (ArbitrationRound round : pendingSendArbitrationRounds) {
1147                 bool isFull = false;
1148                 round->generateParts();
1149                 Vector<Entry*>* parts = round->getParts();
1150
1151                 // Insert pending arbitration data
1152                 for (Entry arbitrationData : parts) {
1153
1154                         // If it is an abort then we need to set some information
1155                         if (arbitrationData instanceof Abort) {
1156                                 ((Abort)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1157                         }
1158
1159                         if (!slot->hasSpace(arbitrationData)) {
1160                                 // No space so cant do anything else with these data entries
1161                                 isFull = true;
1162                                 break;
1163                         }
1164
1165                         // Add to this current slot and add it to entries to delete
1166                         slot->addEntry(arbitrationData);
1167                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1168                 }
1169
1170                 if (isFull) {
1171                         break;
1172                 }
1173         }
1174
1175         if (pendingTransactionQueue->size() > 0) {
1176
1177                 Transaction *transaction = pendingTransactionQueue->get(0);
1178
1179                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1180                 // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) {
1181                 //  transaction->setSequenceNumber(slot->getSequenceNumber());
1182                 // }
1183
1184                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1185                         transaction->setSequenceNumber(slot->getSequenceNumber());
1186                 }
1187
1188
1189                 while (true) {
1190                         TransactionPart *part = transaction->getNextPartToSend();
1191
1192                         if (part == NULL) {
1193                                 // Ran out of parts to send for this transaction so move on
1194                                 break;
1195                         }
1196
1197                         if (slot->hasSpace(part)) {
1198                                 slot->addEntry(part);
1199                                 Vector<int32_t>* partsSent = transactionPartsSent->get(transaction);
1200                                 if (partsSent == NULL) {
1201                                         partsSent = new Vector<int32_t>();
1202                                         transactionPartsSent->put(transaction, partsSent);
1203                                 }
1204                                 partsSent->add(part->getPartNumber());
1205                                 transactionPartsSent->put(transaction, partsSent);
1206                         } else {
1207                                 break;
1208                         }
1209                 }
1210         }
1211
1212         // Fill the remainder of the slot with rescue data
1213         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1214
1215         return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1216 }
1217
1218 void Table::doRejectedMessages(Slot *s) {
1219         if (!rejectedSlotVector->isEmpty()) {
1220                 /* TODO: We should avoid generating a rejected message entry if
1221                  * there is already a sufficient entry in the queue (e->g->,
1222                  * equalsto value of true and same sequence number)->  */
1223
1224                 int64_t old_seqn = rejectedSlotVector->firstElement();
1225                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1226                         int64_t new_seqn = rejectedSlotVector->lastElement();
1227                         RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1228                         s->addEntry(rm);
1229                 } else {
1230                         int64_t prev_seqn = -1;
1231                         int i = 0;
1232                         /* Go through list of missing messages */
1233                         for (; i < rejectedSlotVector->size(); i++) {
1234                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1235                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1236                                 if (s_msg != NULL)
1237                                         break;
1238                                 prev_seqn = curr_seqn;
1239                         }
1240                         /* Generate rejected message entry for missing messages */
1241                         if (prev_seqn != -1) {
1242                                 RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1243                                 s->addEntry(rm);
1244                         }
1245                         /* Generate rejected message entries for present messages */
1246                         for (; i < rejectedSlotVector->size(); i++) {
1247                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1248                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1249                                 int64_t machineid = s_msg->getMachineID();
1250                                 RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1251                                 s->addEntry(rm);
1252                         }
1253                 }
1254         }
1255 }
1256
1257 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1258         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1259         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1260         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1261                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1262         }
1263
1264         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1265         bool seenLiveSlot = false;
1266         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1267         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1268
1269
1270         // Mandatory Rescue
1271         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1272                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1273                 // Push slot number forward
1274                 if (!seenLiveSlot) {
1275                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1276                 }
1277
1278                 if (!previousSlot->isLive()) {
1279                         continue;
1280                 }
1281
1282                 // We have seen a live slot
1283                 seenLiveSlot = true;
1284
1285                 // Get all the live entries for a slot
1286                 Vector<Entry*>* liveEntries = previousSlot->getLiveEntries(resize);
1287
1288                 // Iterate over all the live entries and try to rescue them
1289                 for (Entry liveEntry : liveEntries) {
1290                         if (slot->hasSpace(liveEntry)) {
1291
1292                                 // Enough space to rescue the entry
1293                                 slot->addEntry(liveEntry);
1294                         } else if (currentSequenceNumber == firstIfFull) {
1295                                 //if there's no space but the entry is about to fall off the queue
1296                                 System->out->println("B");      //?
1297                                 return new ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1298
1299                         }
1300                 }
1301         }
1302
1303         // Did not resize
1304         return new ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1305 }
1306
1307 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1308         /* now go through live entries from least to greatest sequence number until
1309          * either all live slots added, or the slot doesn't have enough room
1310          * for SKIP_THRESHOLD consecutive entries*/
1311         int skipcount = 0;
1312         int64_t newestseqnum = buffer->getNewestSeqNum();
1313 search:
1314         for (; seqn <= newestseqnum; seqn++) {
1315                 Slot prevslot = buffer->getSlot(seqn);
1316                 //Push slot number forward
1317                 if (!seenliveslot)
1318                         oldestLiveSlotSequenceNumver = seqn;
1319
1320                 if (!prevslot->isLive())
1321                         continue;
1322                 seenliveslot = true;
1323                 Vector<Entry*>* liveentries = prevslot->getLiveEntries(resize);
1324                 for (Entry *liveentry : liveentries) {
1325                         if (s->hasSpace(liveentry))
1326                                 s->addEntry(liveentry);
1327                         else {
1328                                 skipcount++;
1329                                 if (skipcount > Table_SKIP_THRESHOLD)
1330                                         break search;
1331                         }
1332                 }
1333         }
1334 }
1335
1336 /**
1337  * Checks for malicious activity and updates the local copy of the block chain->
1338  */
1339 void Table::validateAndUpdate(Array<Slot*> *newSlots, bool acceptUpdatesToLocal) {
1340
1341         // The cloud communication layer has checked slot HMACs already before decoding
1342         if (newSlots->length() == 0) {
1343                 return;
1344         }
1345
1346         // Make sure all slots are newer than the last largest slot this client has seen
1347         int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
1348         if (firstSeqNum <= sequenceNumber) {
1349                 throw new Error("Server Error: Sent older slots!");
1350         }
1351
1352         // Create an object that can access both new slots and slots in our local chain
1353         // without committing slots to our local chain
1354         SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1355
1356         // Check that the HMAC chain is not broken
1357         checkHMACChain(indexer, newSlots);
1358
1359         // Set to keep track of messages from clients
1360         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1361
1362         // Process each slots data
1363         for (Slot *slot : newSlots) {
1364                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1365
1366                 updateExpectedSize();
1367         }
1368
1369         // If there is a gap, check to see if the server sent us everything->
1370         if (firstSeqNum != (sequenceNumber + 1)) {
1371
1372                 // Check the size of the slots that were sent down by the server->
1373                 // Can only check the size if there was a gap
1374                 checkNumSlots(newSlots->length);
1375
1376                 // Since there was a gap every machine must have pushed a slot or must have
1377                 // a last message message->  If not then the server is hiding slots
1378                 if (!machineSet->isEmpty()) {
1379                         throw new Error("Missing record for machines: " + machineSet);
1380                 }
1381         }
1382
1383         // Update the size of our local block chain->
1384         commitNewMaxSize();
1385
1386         // Commit new to slots to the local block chain->
1387         for (Slot *slot : newSlots) {
1388
1389                 // Insert this slot into our local block chain copy->
1390                 buffer->putSlot(slot);
1391
1392                 // Keep track of how many slots are currently live (have live data in them)->
1393                 liveSlotCount++;
1394         }
1395
1396         // Get the sequence number of the latest slot in the system
1397         sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
1398
1399         updateLiveStateFromServer();
1400
1401         // No Need to remember after we pulled from the server
1402         offlineTransactionsCommittedAndAtServer->clear();
1403
1404         // This is invalidated now
1405         hadPartialSendToServer = false;
1406 }
1407
1408 void Table::updateLiveStateFromServer() {
1409         // Process the new transaction parts
1410         processNewTransactionParts();
1411
1412         // Do arbitration on new transactions that were received
1413         arbitrateFromServer();
1414
1415         // Update all the committed keys
1416         bool didCommitOrSpeculate = updateCommittedTable();
1417
1418         // Delete the transactions that are now dead
1419         updateLiveTransactionsAndStatus();
1420
1421         // Do speculations
1422         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1423         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1424 }
1425
1426 void Table::updateLiveStateFromLocal() {
1427         // Update all the committed keys
1428         bool didCommitOrSpeculate = updateCommittedTable();
1429
1430         // Delete the transactions that are now dead
1431         updateLiveTransactionsAndStatus();
1432
1433         // Do speculations
1434         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1435         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1436 }
1437
1438 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1439         // if (didFindTableStatus) {
1440         // return;
1441         // }
1442         int64_t prevslots = firstSequenceNumber;
1443
1444
1445         if (didFindTableStatus) {
1446                 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1447                 // System->out->println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
1448
1449         } else {
1450                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1451                 // System->out->println("Here: " + expectedsize);
1452         }
1453
1454         // System->out->println(numberOfSlots);
1455
1456         didFindTableStatus = true;
1457         currMaxSize = numberOfSlots;
1458 }
1459
1460 void Table::updateExpectedSize() {
1461         expectedsize++;
1462
1463         if (expectedsize > currMaxSize) {
1464                 expectedsize = currMaxSize;
1465         }
1466 }
1467
1468
1469 /**
1470  * Check the size of the block chain to make sure there are enough slots sent back by the server->
1471  * This is only called when we have a gap between the slots that we have locally and the slots
1472  * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1473  * status message
1474  */
1475 void Table::checkNumSlots(int numberOfSlots) {
1476         if (numberOfSlots != expectedsize) {
1477                 throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
1478         }
1479 }
1480
1481 void Table::updateCurrMaxSize(int newmaxsize) {
1482         currMaxSize = newmaxsize;
1483 }
1484
1485
1486 /**
1487  * Update the size of of the local buffer if it is needed->
1488  */
1489 void Table::commitNewMaxSize() {
1490         didFindTableStatus = false;
1491
1492         // Resize the local slot buffer
1493         if (numberOfSlots != currMaxSize) {
1494                 buffer->resize((int)currMaxSize);
1495         }
1496
1497         // Change the number of local slots to the new size
1498         numberOfSlots = (int)currMaxSize;
1499
1500
1501         // Recalculate the resize threshold since the size of the local buffer has changed
1502         setResizeThreshold();
1503 }
1504
1505 /**
1506  * Process the new transaction parts from this latest round of slots received from the server
1507  */
1508 void Table::processNewTransactionParts() {
1509
1510         if (newTransactionParts->size() == 0) {
1511                 // Nothing new to process
1512                 return;
1513         }
1514
1515         // Iterate through all the machine Ids that we received new parts for
1516         for (int64_t machineId : newTransactionParts->keySet()) {
1517                 Hashtable<Pair<int64_t int32_t>*, TransactionPart*> * parts = newTransactionParts->get(machineId);
1518
1519                 // Iterate through all the parts for that machine Id
1520                 for (Pair<int64_t, int32_t>* partId : parts->keySet()) {
1521                         TransactionPart *part = parts->get(partId);
1522
1523                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1524                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1525                                 // Set dead the transaction part
1526                                 part->setDead();
1527                                 continue;
1528                         }
1529
1530                         // Get the transaction object for that sequence number
1531                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1532
1533                         if (transaction == NULL) {
1534                                 // This is a new transaction that we dont have so make a new one
1535                                 transaction = new Transaction();
1536
1537                                 // Insert this new transaction into the live tables
1538                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1539                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1540                         }
1541
1542                         // Add that part to the transaction
1543                         transaction->addPartDecode(part);
1544                 }
1545         }
1546
1547         // Clear all the new transaction parts in preparation for the next time the server sends slots
1548         newTransactionParts->clear();
1549 }
1550
1551 void Table::arbitrateFromServer() {
1552
1553         if (liveTransactionBySequenceNumberTable->size() == 0) {
1554                 // Nothing to arbitrate on so move on
1555                 return;
1556         }
1557
1558         // Get the transaction sequence numbers and sort from oldest to newest
1559         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1560         Collections->sort(transactionSequenceNumbers);
1561
1562         // Collection of key value pairs that are
1563         Hashtable<IoTString *, KeyValue*> speculativeTableTmp = new Hashtable<IoTString *, KeyValue*>();
1564
1565         // The last transaction arbitrated on
1566         int64_t lastTransactionCommitted = -1;
1567         Hashset<Abort*>* generatedAborts = new Hashset<Abort*>();
1568
1569         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1570                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1571
1572
1573
1574                 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1575                 if (transaction->getArbitrator() != localMachineId) {
1576                         continue;
1577                 }
1578
1579                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1580                         continue;
1581                 }
1582
1583                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1584                         // We have seen this already locally so dont commit again
1585                         continue;
1586                 }
1587
1588
1589                 if (!transaction->isComplete()) {
1590                         // Will arbitrate in incorrect order if we continue so just break
1591                         // Most likely this
1592                         break;
1593                 }
1594
1595
1596                 // update the largest transaction seen by arbitrator from server
1597                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1598                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1599                 } else {
1600                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1601                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1602                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1603                         }
1604                 }
1605
1606                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1607                         // Guard evaluated as true
1608
1609                         // Update the local changes so we can make the commit
1610                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1611                                 speculativeTableTmp->put(kv->getKey(), kv);
1612                         }
1613
1614                         // Update what the last transaction committed was for use in batch commit
1615                         lastTransactionCommitted = transactionSequenceNumber;
1616                 } else {
1617                         // Guard evaluated was false so create abort
1618
1619                         // Create the abort
1620                         Abort newAbort = new Abort(NULL,
1621                                                                                                                                  transaction->getClientLocalSequenceNumber(),
1622                                                                                                                                  transaction->getSequenceNumber(),
1623                                                                                                                                  transaction->getMachineId(),
1624                                                                                                                                  transaction->getArbitrator(),
1625                                                                                                                                  localArbitrationSequenceNumber);
1626                         localArbitrationSequenceNumber++;
1627
1628                         generatedAborts->add(newAbort);
1629
1630                         // Insert the abort so we can process
1631                         processEntry(newAbort);
1632                 }
1633
1634                 lastSeqNumArbOn = transactionSequenceNumber;
1635
1636                 // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
1637         }
1638
1639         Commit newCommit = NULL;
1640
1641         // If there is something to commit
1642         if (speculativeTableTmp->size() != 0) {
1643
1644                 // Create the commit and increment the commit sequence number
1645                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1646                 localArbitrationSequenceNumber++;
1647
1648                 // Add all the new keys to the commit
1649                 for (KeyValue *kv : speculativeTableTmp->values()) {
1650                         newCommit->addKV(kv);
1651                 }
1652
1653                 // create the commit parts
1654                 newCommit->createCommitParts();
1655
1656                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1657
1658                 // Insert the commit so we can process it
1659                 for (CommitPart commitPart : newCommit->getParts()->values()) {
1660                         processEntry(commitPart);
1661                 }
1662         }
1663
1664         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1665                 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1666                 pendingSendArbitrationRounds->add(arbitrationRound);
1667
1668                 if (compactArbitrationData()) {
1669                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1670                         if (newArbitrationRound->getCommit() != NULL) {
1671                                 for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1672                                         processEntry(commitPart);
1673                                 }
1674                         }
1675                 }
1676         }
1677 }
1678
1679 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1680
1681         // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1682         if (transaction->getArbitrator() != localMachineId) {
1683                 return new Pair<bool, bool>(false, false);
1684         }
1685
1686         if (!transaction->isComplete()) {
1687                 // Will arbitrate in incorrect order if we continue so just break
1688                 // Most likely this
1689                 return new Pair<bool, bool>(false, false);
1690         }
1691
1692         if (transaction->getMachineId() != localMachineId) {
1693                 // dont do this check for local transactions
1694                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1695                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1696                                 // We've have already seen this from the server
1697                                 return new Pair<bool, bool>(false, false);
1698                         }
1699                 }
1700         }
1701
1702         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1703                 // Guard evaluated as true
1704
1705                 // Create the commit and increment the commit sequence number
1706                 Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1707                 localArbitrationSequenceNumber++;
1708
1709                 // Update the local changes so we can make the commit
1710                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1711                         newCommit->addKV(kv);
1712                 }
1713
1714                 // create the commit parts
1715                 newCommit->createCommitParts();
1716
1717                 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1718                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1719                 pendingSendArbitrationRounds->add(arbitrationRound);
1720
1721                 if (compactArbitrationData()) {
1722                         ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1723                         for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1724                                 processEntry(commitPart);
1725                         }
1726                 } else {
1727                         // Insert the commit so we can process it
1728                         for (CommitPart commitPart : newCommit->getParts()->values()) {
1729                                 processEntry(commitPart);
1730                         }
1731                 }
1732
1733                 if (transaction->getMachineId() == localMachineId) {
1734                         TransactionStatus status = transaction->getTransactionStatus();
1735                         if (status != NULL) {
1736                                 status->setStatus(TransactionStatus_StatusCommitted);
1737                         }
1738                 }
1739
1740                 updateLiveStateFromLocal();
1741                 return new Pair<bool, bool>(true, true);
1742         } else {
1743
1744                 if (transaction->getMachineId() == localMachineId) {
1745                         // For locally created messages update the status
1746
1747                         // Guard evaluated was false so create abort
1748                         TransactionStatus status = transaction->getTransactionStatus();
1749                         if (status != NULL) {
1750                                 status->setStatus(TransactionStatus_StatusAborted);
1751                         }
1752                 } else {
1753                         Hashset<Abort *> addAbortSet = new Hashset<Abort * >();
1754
1755
1756                         // Create the abort
1757                         Abort newAbort = new Abort(NULL,
1758                                                                                                                                  transaction->getClientLocalSequenceNumber(),
1759                                                                                                                                  -1,
1760                                                                                                                                  transaction->getMachineId(),
1761                                                                                                                                  transaction->getArbitrator(),
1762                                                                                                                                  localArbitrationSequenceNumber);
1763                         localArbitrationSequenceNumber++;
1764
1765                         addAbortSet->add(newAbort);
1766
1767
1768                         // Append all the commit parts to the end of the pending queue waiting for sending to the server
1769                         ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1770                         pendingSendArbitrationRounds->add(arbitrationRound);
1771
1772                         if (compactArbitrationData()) {
1773                                 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1774                                 for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1775                                         processEntry(commitPart);
1776                                 }
1777                         }
1778                 }
1779
1780                 updateLiveStateFromLocal();
1781                 return new Pair<bool, bool>(true, false);
1782         }
1783 }
1784
1785 /**
1786  * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1787  */
1788 bool Table::compactArbitrationData() {
1789
1790         if (pendingSendArbitrationRounds->size() < 2) {
1791                 // Nothing to compact so do nothing
1792                 return false;
1793         }
1794
1795         ArbitrationRound lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1796         if (lastRound->didSendPart()) {
1797                 return false;
1798         }
1799
1800         bool hadCommit = (lastRound->getCommit() == NULL);
1801         bool gotNewCommit = false;
1802
1803         int numberToDelete = 1;
1804         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1805                 ArbitrationRound round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1806
1807                 if (round->isFull() || round->didSendPart()) {
1808                         // Stop since there is a part that cannot be compacted and we need to compact in order
1809                         break;
1810                 }
1811
1812                 if (round->getCommit() == NULL) {
1813
1814                         // Try compacting aborts only
1815                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1816                         if (newSize > ArbitrationRound->MAX_PARTS) {
1817                                 // Cant compact since it would be too large
1818                                 break;
1819                         }
1820                         lastRound->addAborts(round->getAborts());
1821                 } else {
1822
1823                         // Create a new larger commit
1824                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1825                         localArbitrationSequenceNumber++;
1826
1827                         // Create the commit parts so that we can count them
1828                         newCommit->createCommitParts();
1829
1830                         // Calculate the new size of the parts
1831                         int newSize = newCommit->getNumberOfParts();
1832                         newSize += lastRound->getAbortsCount();
1833                         newSize += round->getAbortsCount();
1834
1835                         if (newSize > ArbitrationRound->MAX_PARTS) {
1836                                 // Cant compact since it would be too large
1837                                 break;
1838                         }
1839
1840                         // Set the new compacted part
1841                         lastRound->setCommit(newCommit);
1842                         lastRound->addAborts(round->getAborts());
1843                         gotNewCommit = true;
1844                 }
1845
1846                 numberToDelete++;
1847         }
1848
1849         if (numberToDelete != 1) {
1850                 // If there is a compaction
1851
1852                 // Delete the previous pieces that are now in the new compacted piece
1853                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1854                         pendingSendArbitrationRounds->clear();
1855                 } else {
1856                         for (int i = 0; i < numberToDelete; i++) {
1857                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1858                         }
1859                 }
1860
1861                 // Add the new compacted into the pending to send list
1862                 pendingSendArbitrationRounds->add(lastRound);
1863
1864                 // Should reinsert into the commit processor
1865                 if (hadCommit && gotNewCommit) {
1866                         return true;
1867                 }
1868         }
1869
1870         return false;
1871 }
1872 // bool compactArbitrationData() {
1873 //  return false;
1874 // }
1875
1876 /**
1877  * Update all the commits and the committed tables, sets dead the dead transactions
1878  */
1879 bool Table::updateCommittedTable() {
1880
1881         if (newCommitParts->size() == 0) {
1882                 // Nothing new to process
1883                 return false;
1884         }
1885
1886         // Iterate through all the machine Ids that we received new parts for
1887         for (int64_t machineId : newCommitParts->keySet()) {
1888                 Hashtable<Pair<int64_t, int32_t>*, CommitPart*>* parts = newCommitParts->get(machineId);
1889
1890                 // Iterate through all the parts for that machine Id
1891                 for (Pair<int64_t, int32_t>* partId : parts->keySet()) {
1892                         CommitPart part = parts->get(partId);
1893
1894                         // Get the transaction object for that sequence number
1895                         Hashtable<int64_t, Commit*>* commitForClientTable = liveCommitsTable->get(part->getMachineId());
1896
1897                         if (commitForClientTable == NULL) {
1898                                 // This is the first commit from this device
1899                                 commitForClientTable = new Hashtable<int64_t, Commit*>();
1900                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1901                         }
1902
1903                         Commit commit = commitForClientTable->get(part->getSequenceNumber());
1904
1905                         if (commit == NULL) {
1906                                 // This is a new commit that we dont have so make a new one
1907                                 commit = new Commit();
1908
1909                                 // Insert this new commit into the live tables
1910                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1911                         }
1912
1913                         // Add that part to the commit
1914                         commit->addPartDecode(part);
1915                 }
1916         }
1917
1918         // Clear all the new commits parts in preparation for the next time the server sends slots
1919         newCommitParts->clear();
1920
1921         // If we process a new commit keep track of it for future use
1922         bool didProcessANewCommit = false;
1923
1924         // Process the commits one by one
1925         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1926
1927                 // Get all the commits for a specific arbitrator
1928                 Hashtable<int64_t, Commit*> commitForClientTable = liveCommitsTable->get(arbitratorId);
1929
1930                 // Sort the commits in order
1931                 Vector<int64_t>* commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1932                 Collections->sort(commitSequenceNumbers);
1933
1934                 // Get the last commit seen from this arbitrator
1935                 int64_t lastCommitSeenSequenceNumber = -1;
1936                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1937                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1938                 }
1939
1940                 // Go through each new commit one by one
1941                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1942                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1943                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1944
1945                         // Special processing if a commit is not complete
1946                         if (!commit->isComplete()) {
1947                                 if (i == (commitSequenceNumbers->size() - 1)) {
1948                                         // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1949                                         break;
1950                                 } else {
1951                                         // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
1952                                         // Delete it and move on
1953                                         commit->setDead();
1954                                         commitForClientTable->remove(commit->getSequenceNumber());
1955                                         continue;
1956                                 }
1957                         }
1958
1959                         // Update the last transaction that was updated if we can
1960                         if (commit->getTransactionSequenceNumber() != -1) {
1961                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1962
1963                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1964                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1965                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1966                                 }
1967                         }
1968
1969                         // Update the last arbitration data that we have seen so far
1970                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1971
1972                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1973                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1974                                         // Is larger
1975                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1976                                 }
1977                         } else {
1978                                 // Never seen any data from this arbitrator so record the first one
1979                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1980                         }
1981
1982                         // We have already seen this commit before so need to do the full processing on this commit
1983                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1984
1985                                 // Update the last transaction that was updated if we can
1986                                 if (commit->getTransactionSequenceNumber() != -1) {
1987                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1988
1989                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1990                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1991                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1992                                         }
1993                                 }
1994
1995                                 continue;
1996                         }
1997
1998                         // If we got here then this is a brand new commit and needs full processing
1999
2000                         // Get what commits should be edited, these are the commits that have live values for their keys
2001                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2002                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2003                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2004                         }
2005                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2006
2007                         // Update each previous commit that needs to be updated
2008                         for (Commit * previousCommit : commitsToEdit) {
2009
2010                                 // Only bother with live commits (TODO: Maybe remove this check)
2011                                 if (previousCommit->isLive()) {
2012
2013                                         // Update which keys in the old commits are still live
2014                                         for (KeyValue * kv : commit->getKeyValueUpdateSet()) {
2015                                                 previousCommit->invalidateKey(kv->getKey());
2016                                         }
2017
2018                                         // if the commit is now dead then remove it
2019                                         if (!previousCommit->isLive()) {
2020                                                 commitForClientTable->remove(previousCommit);
2021                                         }
2022                                 }
2023                         }
2024
2025                         // Update the last seen sequence number from this arbitrator
2026                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2027                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2028                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2029                                 }
2030                         } else {
2031                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2032                         }
2033
2034                         // We processed a new commit that we havent seen before
2035                         didProcessANewCommit = true;
2036
2037                         // Update the committed table of keys and which commit is using which key
2038                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2039                                 committedKeyValueTable->put(kv->getKey(), kv);
2040                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2041                         }
2042                 }
2043         }
2044
2045         return didProcessANewCommit;
2046 }
2047
2048 /**
2049  * Create the speculative table from transactions that are still live and have come from the cloud
2050  */
2051 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2052         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2053                 // There is nothing to speculate on
2054                 return false;
2055         }
2056
2057         // Create a list of the transaction sequence numbers and sort them from oldest to newest
2058         Vector<int64_t>* transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2059         Collections->sort(transactionSequenceNumbersSorted);
2060
2061         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2062
2063
2064         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2065                 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2066                 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2067
2068                 // Start from scratch
2069                 speculatedKeyValueTable->clear();
2070                 lastTransactionSequenceNumberSpeculatedOn = -1;
2071                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2072
2073         }
2074
2075         // Remember the front of the transaction list
2076         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2077
2078         // Find where to start arbitration from
2079         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2080
2081         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2082                 // Make sure we are not out of bounds
2083                 return false;           // did not speculate
2084         }
2085
2086         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2087         bool didSkip = true;
2088
2089         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2090                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2091                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2092
2093                 if (!transaction->isComplete()) {
2094                         // If there is an incomplete transaction then there is nothing we can do
2095                         // add this transactions arbitrator to the list of arbitrators we should ignore
2096                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2097                         didSkip = true;
2098                         continue;
2099                 }
2100
2101                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2102                         continue;
2103                 }
2104
2105                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2106
2107                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2108                         // Guard evaluated to true so update the speculative table
2109                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2110                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2111                         }
2112                 }
2113         }
2114
2115         if (didSkip) {
2116                 // Since there was a skip we need to redo the speculation next time around
2117                 lastTransactionSequenceNumberSpeculatedOn = -1;
2118                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2119         }
2120
2121         // We did some speculation
2122         return true;
2123 }
2124
2125 /**
2126  * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2127  */
2128 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2129         if (pendingTransactionQueue->size() == 0) {
2130                 // There is nothing to speculate on
2131                 return;
2132         }
2133
2134
2135         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2136                 // need to reset on the pending speculation
2137                 lastPendingTransactionSpeculatedOn = NULL;
2138                 firstPendingTransaction = pendingTransactionQueue->get(0);
2139                 pendingTransactionSpeculatedKeyValueTable->clear();
2140         }
2141
2142         // Find where to start arbitration from
2143         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2144
2145         if (startIndex >= pendingTransactionQueue->size()) {
2146                 // Make sure we are not out of bounds
2147                 return;
2148         }
2149
2150         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2151                 Transaction *transaction = pendingTransactionQueue->get(i);
2152
2153                 lastPendingTransactionSpeculatedOn = transaction;
2154
2155                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2156                         // Guard evaluated to true so update the speculative table
2157                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2158                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2159                         }
2160                 }
2161         }
2162 }
2163
2164 /**
2165  * Set dead and remove from the live transaction tables the transactions that are dead
2166  */
2167 void Table::updateLiveTransactionsAndStatus() {
2168
2169         // Go through each of the transactions
2170         for (Iterator<Map->Entry<int64_t, Transaction> >* iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2171                 Transaction *transaction = iter->next()->getValue();
2172
2173                 // Check if the transaction is dead
2174                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2175                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2176
2177                         // Set dead the transaction
2178                         transaction->setDead();
2179
2180                         // Remove the transaction from the live table
2181                         iter->remove();
2182                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2183                 }
2184         }
2185
2186         // Go through each of the transactions
2187         for (Iterator<Map->Entry<int64_t, TransactionStatus*> >* iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2188                 TransactionStatus status = iter->next()->getValue();
2189
2190                 // Check if the transaction is dead
2191                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2192                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2193
2194                         // Set committed
2195                         status->setStatus(TransactionStatus_StatusCommitted);
2196
2197                         // Remove
2198                         iter->remove();
2199                 }
2200         }
2201 }
2202
2203 /**
2204  * Process this slot, entry by entry->  Also update the latest message sent by slot
2205  */
2206 void Table::processSlot(SlotIndexer indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2207
2208         // Update the last message seen
2209         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2210
2211         // Process each entry in the slot
2212         for (Entry *entry : slot->getEntries()) {
2213                 switch (entry->getType()) {
2214
2215                 case TypeCommitPart:
2216                         processEntry((CommitPart)entry);
2217                         break;
2218
2219                 case TypeAbort:
2220                         processEntry((Abort)entry);
2221                         break;
2222
2223                 case TypeTransactionPart:
2224                         processEntry((TransactionPart)entry);
2225                         break;
2226
2227                 case TypeNewKey:
2228                         processEntry((NewKey)entry);
2229                         break;
2230
2231                 case TypeLastMessage:
2232                         processEntry((LastMessage)entry, machineSet);
2233                         break;
2234
2235                 case TypeRejectedMessage:
2236                         processEntry((RejectedMessage)entry, indexer);
2237                         break;
2238
2239                 case TypeTableStatus:
2240                         processEntry((TableStatus)entry, slot->getSequenceNumber());
2241                         break;
2242
2243                 default:
2244                         throw new Error("Unrecognized type: " + entry->getType());
2245                 }
2246         }
2247 }
2248
2249 /**
2250  * Update the last message that was sent for a machine Id
2251  */
2252 void Table::processEntry(LastMessage entry, Hashset<int64_t> *machineSet) {
2253         // Update what the last message received by a machine was
2254         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2255 }
2256
2257 /**
2258  * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2259  */
2260 void Table::processEntry(NewKey* entry) {
2261
2262         // Update the arbitrator table with the new key information
2263         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2264
2265         // Update what the latest live new key is
2266         NewKey oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2267         if (oldNewKey != NULL) {
2268                 // Delete the old new key messages
2269                 oldNewKey->setDead();
2270         }
2271 }
2272
2273 /**
2274  * Process new table status entries and set dead the old ones as new ones come in->
2275  * keeps track of the largest and smallest table status seen in this current round
2276  * of updating the local copy of the block chain
2277  */
2278 void Table::processEntry(TableStatus entry, int64_t seq) {
2279         int newNumSlots = entry->getMaxSlots();
2280         updateCurrMaxSize(newNumSlots);
2281
2282         initExpectedSize(seq, newNumSlots);
2283
2284         if (liveTableStatus != NULL) {
2285                 // We have a larger table status so the old table status is no int64_ter alive
2286                 liveTableStatus->setDead();
2287         }
2288
2289         // Make this new table status the latest alive table status
2290         liveTableStatus = entry;
2291 }
2292
2293 /**
2294  * Check old messages to see if there is a block chain violation-> Also
2295  */
2296 void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
2297         int64_t oldSeqNum = entry->getOldSeqNum();
2298         int64_t newSeqNum = entry->getNewSeqNum();
2299         bool isequal = entry->getEqual();
2300         int64_t machineId = entry->getMachineID();
2301         int64_t seq = entry->getSequenceNumber();
2302
2303
2304         // Check if we have messages that were supposed to be rejected in our local block chain
2305         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2306
2307                 // Get the slot
2308                 Slot *slot = indexer->getSlot(seqNum);
2309
2310                 if (slot != NULL) {
2311                         // If we have this slot make sure that it was not supposed to be a rejected slot
2312
2313                         int64_t slotMachineId = slot->getMachineID();
2314                         if (isequal != (slotMachineId == machineId)) {
2315                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2316                         }
2317                 }
2318         }
2319
2320
2321         // Create a list of clients to watch until they see this rejected message entry->
2322         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2323         for (Map->Entry<int64_t, Pair<int64_t, Liveness>*>* lastMessageEntry : lastMessageTable->entrySet()) {
2324
2325                 // Machine ID for the last message entry
2326                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2327
2328                 // We've seen it, don't need to continue to watch->  Our next
2329                 // message will implicitly acknowledge it->
2330                 if (lastMessageEntryMachineId == localMachineId) {
2331                         continue;
2332                 }
2333
2334                 Pair<int64_t, Liveness> *lastMessageValue = lastMessageEntry->getValue();
2335                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2336
2337                 if (entrySequenceNumber < seq) {
2338
2339                         // Add this rejected message to the set of messages that this machine ID did not see yet
2340                         addWatchVector(lastMessageEntryMachineId, entry);
2341
2342                         // This client did not see this rejected message yet so add it to the watch set to monitor
2343                         deviceWatchSet->add(lastMessageEntryMachineId);
2344                 }
2345         }
2346
2347         if (deviceWatchSet->isEmpty()) {
2348                 // This rejected message has been seen by all the clients so
2349                 entry->setDead();
2350         } else {
2351                 // We need to watch this rejected message
2352                 entry->setWatchSet(deviceWatchSet);
2353         }
2354 }
2355
2356 /**
2357  * Check if this abort is live, if not then save it so we can kill it later->
2358  * update the last transaction number that was arbitrated on->
2359  */
2360 void Table::processEntry(Abort entry) {
2361
2362
2363         if (entry->getTransactionSequenceNumber() != -1) {
2364                 // update the transaction status if it was sent to the server
2365                 TransactionStatus status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2366                 if (status != NULL) {
2367                         status->setStatus(TransactionStatus_StatusAborted);
2368                 }
2369         }
2370
2371         // Abort has not been seen by the client it is for yet so we need to keep track of it
2372         Abort previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2373         if (previouslySeenAbort != NULL) {
2374                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2375         }
2376
2377         if (entry->getTransactionArbitrator() == localMachineId) {
2378                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2379         }
2380
2381         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2382
2383                 // The machine already saw this so it is dead
2384                 entry->setDead();
2385                 liveAbortTable->remove(entry->getAbortId());
2386
2387                 if (entry->getTransactionArbitrator() == localMachineId) {
2388                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2389                 }
2390
2391                 return;
2392         }
2393
2394
2395
2396
2397         // Update the last arbitration data that we have seen so far
2398         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2399
2400                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2401                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2402                         // Is larger
2403                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2404                 }
2405         } else {
2406                 // Never seen any data from this arbitrator so record the first one
2407                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2408         }
2409
2410
2411         // Set dead a transaction if we can
2412         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(new Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2413         if (transactionToSetDead != NULL) {
2414                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2415         }
2416
2417         // Update the last transaction sequence number that the arbitrator arbitrated on
2418         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2419         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2420
2421                 // Is a valid one
2422                 if (entry->getTransactionSequenceNumber() != -1) {
2423                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2424                 }
2425         }
2426 }
2427
2428 /**
2429  * Set dead the transaction part if that transaction is dead and keep track of all new parts
2430  */
2431 void Table::processEntry(TransactionPart entry) {
2432         // Check if we have already seen this transaction and set it dead OR if it is not alive
2433         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2434         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2435                 // This transaction is dead, it was already committed or aborted
2436                 entry->setDead();
2437                 return;
2438         }
2439
2440         // This part is still alive
2441         Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>* transactionPart = newTransactionParts->get(entry->getMachineId());
2442
2443         if (transactionPart == NULL) {
2444                 // Dont have a table for this machine Id yet so make one
2445                 transactionPart = new Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>();
2446                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2447         }
2448
2449         // Update the part and set dead ones we have already seen (got a rescued version)
2450         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2451         if (previouslySeenPart != NULL) {
2452                 previouslySeenPart->setDead();
2453         }
2454 }
2455
2456 /**
2457  * Process new commit entries and save them for future use->  Delete duplicates
2458  */
2459 void Table::processEntry(CommitPart entry) {
2460
2461
2462         // Update the last transaction that was updated if we can
2463         if (entry->getTransactionSequenceNumber() != -1) {
2464                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2465
2466                 // Update the last transaction sequence number that the arbitrator arbitrated on
2467                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2468                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2469                 }
2470         }
2471
2472
2473
2474
2475         Hashtable<Pair<int64_t, int32_t>*, CommitPart*>* commitPart = newCommitParts->get(entry->getMachineId());
2476
2477         if (commitPart == NULL) {
2478                 // Don't have a table for this machine Id yet so make one
2479                 commitPart = new Hashtable<Pair<int64_t, int32_t>*, CommitPart*>();
2480                 newCommitParts->put(entry->getMachineId(), commitPart);
2481         }
2482
2483         // Update the part and set dead ones we have already seen (got a rescued version)
2484         CommitPart previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2485         if (previouslySeenPart != NULL) {
2486                 previouslySeenPart->setDead();
2487         }
2488 }
2489
2490 /**
2491  * Update the last message seen table->  Update and set dead the appropriate RejectedMessages as clients see them->
2492  * Updates the live aborts, removes those that are dead and sets them dead->
2493  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2494  * other clients have not had a rollback on the last message->
2495  */
2496 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2497
2498         // We have seen this machine ID
2499         machineSet->remove(machineId);
2500
2501         // Get the set of rejected messages that this machine Id is has not seen yet
2502         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2503
2504         // If there is a rejected message that this machine Id has not seen yet
2505         if (watchset != NULL) {
2506
2507                 // Go through each rejected message that this machine Id has not seen yet
2508                 for (Iterator<RejectedMessage> rmit = watchset->iterator(); rmit->hasNext(); ) {
2509
2510                         RejectedMessage rm = rmit->next();
2511
2512                         // If this machine Id has seen this rejected message->->->
2513                         if (rm->getSequenceNumber() <= seqNum) {
2514
2515                                 // Remove it from our watchlist
2516                                 rmit->remove();
2517
2518                                 // Decrement machines that need to see this notification
2519                                 rm->removeWatcher(machineId);
2520                         }
2521                 }
2522         }
2523
2524         // Set dead the abort
2525         for (Iterator<Map->Entry<Pair<int64_t, int64_t>*, Abort*> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2526                 Abort abort = i->next()->getValue();
2527
2528                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2529                         abort->setDead();
2530                         i->remove();
2531
2532                         if (abort->getTransactionArbitrator() == localMachineId) {
2533                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2534                         }
2535                 }
2536         }
2537
2538
2539
2540         if (machineId == localMachineId) {
2541                 // Our own messages are immediately dead->
2542                 if (liveness instanceof LastMessage) {
2543                         ((LastMessage)liveness)->setDead();
2544                 } else if (liveness instanceof Slot) {
2545                         ((Slot)liveness)->setDead();
2546                 } else {
2547                         throw new Error("Unrecognized type");
2548                 }
2549         }
2550
2551         // Get the old last message for this device
2552         Pair<int64_t, Liveness*> lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness*>(seqNum, liveness));
2553         if (lastMessageEntry == NULL) {
2554                 // If no last message then there is nothing else to process
2555                 return;
2556         }
2557
2558         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2559         Liveness lastEntry = lastMessageEntry->getSecond();
2560
2561         // If it is not our machine Id since we already set ours to dead
2562         if (machineId != localMachineId) {
2563                 if (lastEntry instanceof LastMessage) {
2564                         ((LastMessage)lastEntry)->setDead();
2565                 } else if (lastEntry instanceof Slot) {
2566                         ((Slot)lastEntry)->setDead();
2567                 } else {
2568                         throw new Error("Unrecognized type");
2569                 }
2570         }
2571
2572         // Make sure the server is not playing any games
2573         if (machineId == localMachineId) {
2574
2575                 if (hadPartialSendToServer) {
2576                         // We were not making any updates and we had a machine mismatch
2577                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2578                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2579                         }
2580
2581                 } else {
2582                         // We were not making any updates and we had a machine mismatch
2583                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2584                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2585                         }
2586                 }
2587         } else {
2588                 if (lastMessageSeqNum > seqNum) {
2589                         throw new Error("Server Error: Rollback on remote machine sequence number");
2590                 }
2591         }
2592 }
2593
2594 /**
2595  * Add a rejected message entry to the watch set to keep track of which clients have seen that
2596  * rejected message entry and which have not->
2597  */
2598 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
2599         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2600         if (entries == NULL) {
2601                 // There is no set for this machine ID yet so create one
2602                 entries = new Hashset<RejectedMessage *>();
2603                 rejectedMessageWatchVectorTable->put(machineId, entries);
2604         }
2605         entries->add(entry);
2606 }
2607
2608 /**
2609  * Check if the HMAC chain is not violated
2610  */
2611 void Table::checkHMACChain(SlotIndexer indexer, Array<Slot*> *newSlots) {
2612         for (int i = 0; i < newSlots->length; i++) {
2613                 Slot currSlot = newSlots[i];
2614                 Slot prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2615                 if (prevSlot != NULL &&
2616                                 !Arrays->equals(prevSlot->getHMAC(), currSlot->getPrevHMAC()))
2617                         throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
2618         }
2619 }
2620