edits
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17
18
19 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
20         buffer(NULL),
21         cloud(new CloudComm(this, baseurl, password, listeningPort)),
22         random(NULL),
23         liveTableStatus(NULL),
24         pendingTransactionBuilder(NULL),
25         lastPendingTransactionSpeculatedOn(NULL),
26         firstPendingTransaction(NULL),
27         numberOfSlots(0),
28         bufferResizeThreshold(0),
29         liveSlotCount(0),
30         oldestLiveSlotSequenceNumver(1),
31         localMachineId(_localMachineId),
32         sequenceNumber(0),
33         localTransactionSequenceNumber(0),
34         lastTransactionSequenceNumberSpeculatedOn(0),
35         oldestTransactionSequenceNumberSpeculatedOn(0),
36         localArbitrationSequenceNumber(0),
37         hadPartialSendToServer(false),
38         attemptedToSendToServer(false),
39         expectedsize(0),
40         didFindTableStatus(false),
41         currMaxSize(0),
42         lastSlotAttemptedToSend(NULL),
43         lastIsNewKey(false),
44         lastNewSize(0),
45         lastTransactionPartsSent(NULL),
46         lastPendingSendArbitrationEntriesToDelete(NULL),
47         lastNewKey(NULL),
48         committedKeyValueTable(NULL),
49         speculatedKeyValueTable(NULL),
50         pendingTransactionSpeculatedKeyValueTable(NULL),
51         liveNewKeyTable(NULL),
52         lastMessageTable(NULL),
53         rejectedMessageWatchVectorTable(NULL),
54         arbitratorTable(NULL),
55         liveAbortTable(NULL),
56         newTransactionParts(NULL),
57         newCommitParts(NULL),
58         lastArbitratedTransactionNumberByArbitratorTable(NULL),
59         liveTransactionBySequenceNumberTable(NULL),
60         liveTransactionByTransactionIdTable(NULL),
61         liveCommitsTable(NULL),
62         liveCommitsByKeyTable(NULL),
63         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
64         rejectedSlotVector(NULL),
65         pendingTransactionQueue(NULL),
66         pendingSendArbitrationRounds(NULL),
67         pendingSendArbitrationEntriesToDelete(NULL),
68         transactionPartsSent(NULL),
69         outstandingTransactionStatus(NULL),
70         liveAbortsGeneratedByLocal(NULL),
71         offlineTransactionsCommittedAndAtServer(NULL),
72         localCommunicationTable(NULL),
73         lastTransactionSeenFromMachineFromServer(NULL),
74         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
75         lastInsertedNewKey(false),
76         lastSeqNumArbOn(0)
77 {
78         init();
79 }
80
81 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82         buffer(NULL),
83         cloud(_cloud),
84         random(NULL),
85         liveTableStatus(NULL),
86         pendingTransactionBuilder(NULL),
87         lastPendingTransactionSpeculatedOn(NULL),
88         firstPendingTransaction(NULL),
89         numberOfSlots(0),
90         bufferResizeThreshold(0),
91         liveSlotCount(0),
92         oldestLiveSlotSequenceNumver(1),
93         localMachineId(_localMachineId),
94         sequenceNumber(0),
95         localTransactionSequenceNumber(0),
96         lastTransactionSequenceNumberSpeculatedOn(0),
97         oldestTransactionSequenceNumberSpeculatedOn(0),
98         localArbitrationSequenceNumber(0),
99         hadPartialSendToServer(false),
100         attemptedToSendToServer(false),
101         expectedsize(0),
102         didFindTableStatus(false),
103         currMaxSize(0),
104         lastSlotAttemptedToSend(NULL),
105         lastIsNewKey(false),
106         lastNewSize(0),
107         lastTransactionPartsSent(NULL),
108         lastPendingSendArbitrationEntriesToDelete(NULL),
109         lastNewKey(NULL),
110         committedKeyValueTable(NULL),
111         speculatedKeyValueTable(NULL),
112         pendingTransactionSpeculatedKeyValueTable(NULL),
113         liveNewKeyTable(NULL),
114         lastMessageTable(NULL),
115         rejectedMessageWatchVectorTable(NULL),
116         arbitratorTable(NULL),
117         liveAbortTable(NULL),
118         newTransactionParts(NULL),
119         newCommitParts(NULL),
120         lastArbitratedTransactionNumberByArbitratorTable(NULL),
121         liveTransactionBySequenceNumberTable(NULL),
122         liveTransactionByTransactionIdTable(NULL),
123         liveCommitsTable(NULL),
124         liveCommitsByKeyTable(NULL),
125         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
126         rejectedSlotVector(NULL),
127         pendingTransactionQueue(NULL),
128         pendingSendArbitrationRounds(NULL),
129         pendingSendArbitrationEntriesToDelete(NULL),
130         transactionPartsSent(NULL),
131         outstandingTransactionStatus(NULL),
132         liveAbortsGeneratedByLocal(NULL),
133         offlineTransactionsCommittedAndAtServer(NULL),
134         localCommunicationTable(NULL),
135         lastTransactionSeenFromMachineFromServer(NULL),
136         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
137         lastInsertedNewKey(false),
138         lastSeqNumArbOn(0)
139 {
140         init();
141 }
142
143 /**
144  * Init all the stuff needed for for table usage
145  */
146 void Table::init() {
147         // Init helper objects
148         random = new Random();
149         buffer = new SlotBuffer();
150
151         // init data structs
152         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
153         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
154         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
155         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
156         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
157         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
158         arbitratorTable = new Hashtable<IoTString *, int64_t>();
159         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
160         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
161         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
162         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
163         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
164         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
165         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
166         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
167         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
168         rejectedSlotVector = new Vector<int64_t>();
169         pendingTransactionQueue = new Vector<Transaction *>();
170         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
171         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
172         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
173         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
174         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals>();
175         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
176         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
177         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
178         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
179
180         // Other init stuff
181         numberOfSlots = buffer->capacity();
182         setResizeThreshold();
183 }
184
185 /**
186  * Initialize the table by inserting a table status as the first entry
187  * into the table status also initialize the crypto stuff.
188  */
189 void Table::initTable() {
190         cloud->initSecurity();
191
192         // Create the first insertion into the block chain which is the table status
193         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
194         localSequenceNumber++;
195         TableStatus *status = new TableStatus(s, numberOfSlots);
196         s->addEntry(status);
197         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
198
199         if (array == NULL) {
200                 array = new Array<Slot *>(1);
201                 array->set(0, s);
202                 // update local block chain
203                 validateAndUpdate(array, true);
204         } else if (array->length() == 1) {
205                 // in case we did push the slot BUT we failed to init it
206                 validateAndUpdate(array, true);
207         } else {
208                 throw new Error("Error on initialization");
209         }
210 }
211
212 /**
213  * Rebuild the table from scratch by pulling the latest block chain
214  * from the server.
215  */
216 void Table::rebuild() {
217         // Just pull the latest slots from the server
218         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
219         validateAndUpdate(newslots, true);
220         sendToServer(NULL);
221         updateLiveTransactionsAndStatus();
222 }
223
224 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
225         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
226 }
227
228 int64_t Table::getArbitrator(IoTString *key) {
229         return arbitratorTable->get(key);
230 }
231
232 void Table::close() {
233         cloud->close();
234 }
235
236 IoTString *Table::getCommitted(IoTString *key)  {
237         KeyValue *kv = committedKeyValueTable->get(key);
238
239         if (kv != NULL) {
240                 return kv->getValue();
241         } else {
242                 return NULL;
243         }
244 }
245
246 IoTString *Table::getSpeculative(IoTString *key) {
247         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
248
249         if (kv == NULL) {
250                 kv = speculatedKeyValueTable->get(key);
251         }
252
253         if (kv == NULL) {
254                 kv = committedKeyValueTable->get(key);
255         }
256
257         if (kv != NULL) {
258                 return kv->getValue();
259         } else {
260                 return NULL;
261         }
262 }
263
264 IoTString *Table::getCommittedAtomic(IoTString *key) {
265         KeyValue *kv = committedKeyValueTable->get(key);
266
267         if (!arbitratorTable->contains(key)) {
268                 throw new Error("Key not Found.");
269         }
270
271         // Make sure new key value pair matches the current arbitrator
272         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
273                 // TODO: Maybe not throw en error
274                 throw new Error("Not all Key Values Match Arbitrator.");
275         }
276
277         if (kv != NULL) {
278                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
279                 return kv->getValue();
280         } else {
281                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
282                 return NULL;
283         }
284 }
285
286 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
287         if (!arbitratorTable->contains(key)) {
288                 throw new Error("Key not Found.");
289         }
290
291         // Make sure new key value pair matches the current arbitrator
292         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
293                 // TODO: Maybe not throw en error
294                 throw new Error("Not all Key Values Match Arbitrator.");
295         }
296
297         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298
299         if (kv == NULL) {
300                 kv = speculatedKeyValueTable->get(key);
301         }
302
303         if (kv == NULL) {
304                 kv = committedKeyValueTable->get(key);
305         }
306
307         if (kv != NULL) {
308                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
309                 return kv->getValue();
310         } else {
311                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
312                 return NULL;
313         }
314 }
315
316 bool Table::update()  {
317         try {
318                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
319                 validateAndUpdate(newSlots, false);
320                 sendToServer(NULL);
321                 updateLiveTransactionsAndStatus();
322                 return true;
323         } catch (Exception *e) {
324                 for (int64_t m : localCommunicationTable->keySet()) {
325                         updateFromLocal(m);
326                 }
327         }
328
329         return false;
330 }
331
332 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
333         while (true) {
334                 if (!arbitratorTable->contains(keyName)) {
335                         // There is already an arbitrator
336                         return false;
337                 }
338                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
339
340                 if (sendToServer(newKey)) {
341                         // If successfully inserted
342                         return true;
343                 }
344         }
345 }
346
347 void Table::startTransaction() {
348         // Create a new transaction, invalidates any old pending transactions.
349         pendingTransactionBuilder = new PendingTransaction(localMachineId);
350 }
351
352 void Table::addKV(IoTString *key, IoTString *value) {
353
354         // Make sure it is a valid key
355         if (!arbitratorTable->contains(key)) {
356                 throw new Error("Key not Found.");
357         }
358
359         // Make sure new key value pair matches the current arbitrator
360         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
361                 // TODO: Maybe not throw en error
362                 throw new Error("Not all Key Values Match Arbitrator.");
363         }
364
365         // Add the key value to this transaction
366         KeyValue *kv = new KeyValue(key, value);
367         pendingTransactionBuilder->addKV(kv);
368 }
369
370 TransactionStatus *Table::commitTransaction() {
371         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
372                 // transaction with no updates will have no effect on the system
373                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
374         }
375
376         // Set the local transaction sequence number and increment
377         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
378         localTransactionSequenceNumber++;
379
380         // Create the transaction status
381         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
382
383         // Create the new transaction
384         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
385         newTransaction->setTransactionStatus(transactionStatus);
386
387         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
388                 // Add it to the queue and invalidate the builder for safety
389                 pendingTransactionQueue->add(newTransaction);
390         } else {
391                 arbitrateOnLocalTransaction(newTransaction);
392                 updateLiveStateFromLocal();
393         }
394
395         pendingTransactionBuilder = new PendingTransaction(localMachineId);
396
397         try {
398                 sendToServer(NULL);
399         } catch (ServerException *e) {
400
401                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
402                 uint size = pendingTransactionQueue->size();
403                 uint oldindex = 0;
404                 for(int iter = 0; iter < size; iter++) {
405                         Transaction *transaction = pendingTransactionQueue->get(iter);
406                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
407                         
408                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409                                 // Already contacted this client so ignore all attempts to contact this client
410                                 // to preserve ordering for arbitrator
411                                 continue;
412                         }
413
414                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
415
416                         if (sendReturn.getFirst()) {
417                                 // Failed to contact over local
418                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
419                         } else {
420                                 // Successful contact or should not contact
421
422                                 if (sendReturn.getSecond()) {
423                                         // did arbitrate
424                                         oldindex--;
425                                 }
426                         }
427                 }
428         }
429         pendingTransactionQueue->setSize(oldindex);
430         
431         updateLiveStateFromLocal();
432
433         return transactionStatus;
434 }
435
436 /**
437  * Recalculate the new resize threshold
438  */
439 void Table::setResizeThreshold() {
440         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
441         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
442 }
443
444 int64_t Table::getLocalSequenceNumber() {
445         return localSequenceNumber;
446 }
447
448 bool Table::sendToServer(NewKey *newKey) {
449         bool fromRetry = false;
450         try {
451                 if (hadPartialSendToServer) {
452                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
453                         if (newSlots->length() == 0) {
454                                 fromRetry = true;
455                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
456
457                                 if (sendSlotsReturn.getFirst()) {
458                                         if (newKey != NULL) {
459                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
460                                                         newKey = NULL;
461                                                 }
462                                         }
463
464                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
465                                                 transaction->resetServerFailure();
466                                                 // Update which transactions parts still need to be sent
467                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
468                                                 // Add the transaction status to the outstanding list
469                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
470
471                                                 // Update the transaction status
472                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
473
474                                                 // Check if all the transaction parts were successfully
475                                                 // sent and if so then remove it from pending
476                                                 if (transaction->didSendAllParts()) {
477                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
478                                                         pendingTransactionQueue->remove(transaction);
479                                                 }
480                                         }
481                                 } else {
482                                         newSlots = sendSlotsReturn.getThird();
483                                         bool isInserted = false;
484                                         for (uint si = 0; si < newSlots->length(); si++) {
485                                                 Slot *s = newSlots->get(si);
486                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
487                                                         isInserted = true;
488                                                         break;
489                                                 }
490                                         }
491
492                                         for (uint si = 0; si < newSlots->length(); si++) {
493                                                 Slot *s = newSlots->get(si);
494                                                 if (isInserted) {
495                                                         break;
496                                                 }
497
498                                                 // Process each entry in the slot
499                                                 for (Entry *entry : s->getEntries()) {
500                                                         if (entry->getType() == TypeLastMessage) {
501                                                                 LastMessage *lastMessage = (LastMessage *)entry;
502                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
503                                                                         isInserted = true;
504                                                                         break;
505                                                                 }
506                                                         }
507                                                 }
508                                         }
509
510                                         if (isInserted) {
511                                                 if (newKey != NULL) {
512                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
513                                                                 newKey = NULL;
514                                                         }
515                                                 }
516
517                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
518                                                         transaction->resetServerFailure();
519
520                                                         // Update which transactions parts still need to be sent
521                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
522
523                                                         // Add the transaction status to the outstanding list
524                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
525
526                                                         // Update the transaction status
527                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
528
529                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
530                                                         if (transaction->didSendAllParts()) {
531                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
532                                                                 pendingTransactionQueue->remove(transaction);
533                                                         } else {
534                                                                 transaction->resetServerFailure();
535                                                                 // Set the transaction sequence number back to nothing
536                                                                 if (!transaction->didSendAPartToServer()) {
537                                                                         transaction->setSequenceNumber(-1);
538                                                                 }
539                                                         }
540                                                 }
541                                         }
542                                 }
543
544                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
545                                         transaction->resetServerFailure();
546                                         // Set the transaction sequence number back to nothing
547                                         if (!transaction->didSendAPartToServer()) {
548                                                 transaction->setSequenceNumber(-1);
549                                         }
550                                 }
551
552                                 if (sendSlotsReturn.getThird()->length() != 0) {
553                                         // insert into the local block chain
554                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
555                                 }
556                                 // continue;
557                         } else {
558                                 bool isInserted = false;
559                                 for (uint si = 0; si < newSlots->length(); si++) {
560                                         Slot *s = newSlots->get(si);
561                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
562                                                 isInserted = true;
563                                                 break;
564                                         }
565                                 }
566
567                                 for (uint si = 0; si < newSlots->length(); si++) {
568                                         Slot *s = newSlots->get(si);
569                                         if (isInserted) {
570                                                 break;
571                                         }
572
573                                         // Process each entry in the slot
574                                         for (Entry *entry : s->getEntries()) {
575
576                                                 if (entry->getType() == TypeLastMessage) {
577                                                         LastMessage *lastMessage = (LastMessage *)entry;
578                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
579                                                                 isInserted = true;
580                                                                 break;
581                                                         }
582                                                 }
583                                         }
584                                 }
585
586                                 if (isInserted) {
587                                         if (newKey != NULL) {
588                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
589                                                         newKey = NULL;
590                                                 }
591                                         }
592
593                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
594                                                 transaction->resetServerFailure();
595
596                                                 // Update which transactions parts still need to be sent
597                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
598
599                                                 // Add the transaction status to the outstanding list
600                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
601
602                                                 // Update the transaction status
603                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
604
605                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
606                                                 if (transaction->didSendAllParts()) {
607                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
608                                                         pendingTransactionQueue->remove(transaction);
609                                                 } else {
610                                                         transaction->resetServerFailure();
611                                                         // Set the transaction sequence number back to nothing
612                                                         if (!transaction->didSendAPartToServer()) {
613                                                                 transaction->setSequenceNumber(-1);
614                                                         }
615                                                 }
616                                         }
617                                 } else {
618                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
619                                                 transaction->resetServerFailure();
620                                                 // Set the transaction sequence number back to nothing
621                                                 if (!transaction->didSendAPartToServer()) {
622                                                         transaction->setSequenceNumber(-1);
623                                                 }
624                                         }
625                                 }
626
627                                 // insert into the local block chain
628                                 validateAndUpdate(newSlots, true);
629                         }
630                 }
631         } catch (ServerException *e) {
632                 throw e;
633         }
634
635
636
637         try {
638                 // While we have stuff that needs inserting into the block chain
639                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
640
641                         fromRetry = false;
642
643                         if (hadPartialSendToServer) {
644                                 throw new Error("Should Be error free");
645                         }
646
647
648
649                         // If there is a new key with same name then end
650                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
651                                 return false;
652                         }
653
654                         // Create the slot
655                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
656                         localSequenceNumber++;
657
658                         // Try to fill the slot with data
659                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
660                         bool needsResize = fillSlotsReturn.getFirst();
661                         int newSize = fillSlotsReturn.getSecond();
662                         bool insertedNewKey = fillSlotsReturn.getThird();
663
664                         if (needsResize) {
665                                 // Reset which transaction to send
666                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
667                                         transaction->resetNextPartToSend();
668
669                                         // Set the transaction sequence number back to nothing
670                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
671                                                 transaction->setSequenceNumber(-1);
672                                         }
673                                 }
674
675                                 // Clear the sent data since we are trying again
676                                 pendingSendArbitrationEntriesToDelete->clear();
677                                 transactionPartsSent->clear();
678
679                                 // We needed a resize so try again
680                                 fillSlot(slot, true, newKey);
681                         }
682
683                         lastSlotAttemptedToSend = slot;
684                         lastIsNewKey = (newKey != NULL);
685                         lastInsertedNewKey = insertedNewKey;
686                         lastNewSize = newSize;
687                         lastNewKey = newKey;
688                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
689                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
690
691
692                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
693
694                         if (sendSlotsReturn.getFirst()) {
695
696                                 // Did insert into the block chain
697
698                                 if (insertedNewKey) {
699                                         // This slot was what was inserted not a previous slot
700
701                                         // New Key was successfully inserted into the block chain so dont want to insert it again
702                                         newKey = NULL;
703                                 }
704
705                                 // Remove the aborts and commit parts that were sent from the pending to send queue
706                                 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
707                                         ArbitrationRound *round = iter->next();
708                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
709
710                                         if (round->isDoneSending()) {
711                                                 // Sent all the parts
712                                                 iter->remove();
713                                         }
714                                 }
715
716                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
717                                         transaction->resetServerFailure();
718
719                                         // Update which transactions parts still need to be sent
720                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
721
722                                         // Add the transaction status to the outstanding list
723                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
724
725                                         // Update the transaction status
726                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
727
728                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
729                                         if (transaction->didSendAllParts()) {
730                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
731                                                 pendingTransactionQueue->remove(transaction);
732                                         }
733                                 }
734                         } else {
735                                 // Reset which transaction to send
736                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
737                                         transaction->resetNextPartToSend();
738
739                                         // Set the transaction sequence number back to nothing
740                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
741                                                 transaction->setSequenceNumber(-1);
742                                         }
743                                 }
744                         }
745
746                         // Clear the sent data in preparation for next send
747                         pendingSendArbitrationEntriesToDelete->clear();
748                         transactionPartsSent->clear();
749
750                         if (sendSlotsReturn.getThird()->length() != 0) {
751                                 // insert into the local block chain
752                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
753                         }
754                 }
755
756         } catch (ServerException *e) {
757
758                 if (e->getType() != ServerException->TypeInputTimeout) {
759                         // Nothing was able to be sent to the server so just clear these data structures
760                         for (Transaction *transaction : transactionPartsSent->keySet()) {
761                                 transaction->resetNextPartToSend();
762
763                                 // Set the transaction sequence number back to nothing
764                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
765                                         transaction->setSequenceNumber(-1);
766                                 }
767                         }
768                 } else {
769                         // There was a partial send to the server
770                         hadPartialSendToServer = true;
771
772                         // Nothing was able to be sent to the server so just clear these data structures
773                         for (Transaction *transaction : transactionPartsSent->keySet()) {
774                                 transaction->resetNextPartToSend();
775                                 transaction->setServerFailure();
776                         }
777                 }
778
779                 pendingSendArbitrationEntriesToDelete->clear();
780                 transactionPartsSent->clear();
781
782                 throw e;
783         }
784
785         return newKey == NULL;
786 }
787
788 bool Table::updateFromLocal(int64_t machineId) {
789         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
790         if (localCommunicationInformation == NULL) {
791                 // Cant talk to that device locally so do nothing
792                 return false;
793         }
794
795         // Get the size of the send data
796         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
797
798         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
799         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
800                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
801         }
802
803         Array<char> *sendData = new Array<char>(sendDataSize);
804         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
805
806         // Encode the data
807         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
808         bbEncode->putInt(0);
809
810         // Send by local
811         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
812         localSequenceNumber++;
813
814         if (returnData == NULL) {
815                 // Could not contact server
816                 return false;
817         }
818
819         // Decode the data
820         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
821         int numberOfEntries = bbDecode->getInt();
822
823         for (int i = 0; i < numberOfEntries; i++) {
824                 char type = bbDecode->get();
825                 if (type == TypeAbort) {
826                         Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
827                         processEntry(abort);
828                 } else if (type == TypeCommitPart) {
829                         CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
830                         processEntry(commitPart);
831                 }
832         }
833
834         updateLiveStateFromLocal();
835
836         return true;
837 }
838
839 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
840
841         // Get the devices local communications
842         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
843
844         if (localCommunicationInformation == NULL) {
845                 // Cant talk to that device locally so do nothing
846                 return Pair<bool, bool>(true, false);
847         }
848
849         // Get the size of the send data
850         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
851         for (TransactionPart *part : transaction->getParts()->values()) {
852                 sendDataSize += part->getSize();
853         }
854
855         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
856         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
857                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
858         }
859
860         // Make the send data size
861         Array<char> *sendData = new Array<char>(sendDataSize);
862         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
863
864         // Encode the data
865         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
866         bbEncode->putInt(transaction->getParts()->size());
867         for (TransactionPart *part : transaction->getParts()->values()) {
868                 part->encode(bbEncode);
869         }
870
871
872         // Send by local
873         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
874         localSequenceNumber++;
875
876         if (returnData == NULL) {
877                 // Could not contact server
878                 return Pair<bool, bool>(true, false);
879         }
880
881         // Decode the data
882         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
883         bool didCommit = bbDecode->get() == 1;
884         bool couldArbitrate = bbDecode->get() == 1;
885         int numberOfEntries = bbDecode->getInt();
886         bool foundAbort = false;
887
888         for (int i = 0; i < numberOfEntries; i++) {
889                 char type = bbDecode->get();
890                 if (type == TypeAbort) {
891                         Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
892
893                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
894                                 foundAbort = true;
895                         }
896
897                         processEntry(abort);
898                 } else if (type == TypeCommitPart) {
899                         CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
900                         processEntry(commitPart);
901                 }
902         }
903
904         updateLiveStateFromLocal();
905
906         if (couldArbitrate) {
907                 TransactionStatus status =  transaction->getTransactionStatus();
908                 if (didCommit) {
909                         status->setStatus(TransactionStatus_StatusCommitted);
910                 } else {
911                         status->setStatus(TransactionStatus_StatusAborted);
912                 }
913         } else {
914                 TransactionStatus status =  transaction->getTransactionStatus();
915                 if (foundAbort) {
916                         status->setStatus(TransactionStatus_StatusAborted);
917                 } else {
918                         status->setStatus(TransactionStatus_StatusCommitted);
919                 }
920         }
921
922         return Pair<bool, bool>(false, true);
923 }
924
925 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
926
927         // Decode the data
928         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
929         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
930         int numberOfParts = bbDecode->getInt();
931
932         // If we did commit a transaction or not
933         bool didCommit = false;
934         bool couldArbitrate = false;
935
936         if (numberOfParts != 0) {
937
938                 // decode the transaction
939                 Transaction *transaction = new Transaction();
940                 for (int i = 0; i < numberOfParts; i++) {
941                         bbDecode->get();
942                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
943                         transaction->addPartDecode(newPart);
944                 }
945
946                 // Arbitrate on transaction and pull relevant return data
947                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
948                 couldArbitrate = localArbitrateReturn.getFirst();
949                 didCommit = localArbitrateReturn.getSecond();
950
951                 updateLiveStateFromLocal();
952
953                 // Transaction was sent to the server so keep track of it to prevent double commit
954                 if (transaction->getSequenceNumber() != -1) {
955                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
956                 }
957         }
958
959         // The data to send back
960         int returnDataSize = 0;
961         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
962
963         // Get the aborts to send back
964         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
965         Collections->sort(abortLocalSequenceNumbers);
966         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
967                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
968                         continue;
969                 }
970
971                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
972                 unseenArbitrations->add(abort);
973                 returnDataSize += abort->getSize();
974         }
975
976         // Get the commits to send back
977         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
978         if (commitForClientTable != NULL) {
979                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
980                 Collections->sort(commitLocalSequenceNumbers);
981
982                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
983                         Commit *commit = commitForClientTable->get(localSequenceNumber);
984
985                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
986                                 continue;
987                         }
988
989                         unseenArbitrations->addAll(commit->getParts()->values());
990
991                         for (CommitPart *commitPart : commit->getParts()->values()) {
992                                 returnDataSize += commitPart->getSize();
993                         }
994                 }
995         }
996
997         // Number of arbitration entries to decode
998         returnDataSize += 2 * sizeof(int32_t);
999
1000         // bool of did commit or not
1001         if (numberOfParts != 0) {
1002                 returnDataSize += sizeof(char);
1003         }
1004
1005         // Data to send Back
1006         Array<char> *returnData = new Array<char>(returnDataSize);
1007         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1008
1009         if (numberOfParts != 0) {
1010                 if (didCommit) {
1011                         bbEncode->put((char)1);
1012                 } else {
1013                         bbEncode->put((char)0);
1014                 }
1015                 if (couldArbitrate) {
1016                         bbEncode->put((char)1);
1017                 } else {
1018                         bbEncode->put((char)0);
1019                 }
1020         }
1021
1022         bbEncode->putInt(unseenArbitrations->size());
1023         uint size = unseenArbitrations->size();
1024         for(uint i = 0; i< size; i++) {
1025                 Entry * entry = unseenArbitrations->get(i);
1026                 entry->encode(bbEncode);
1027         }
1028
1029         localSequenceNumber++;
1030         return returnData;
1031 }
1032
1033 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1034         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1035         attemptedToSendToServer = true;
1036
1037         bool inserted = false;
1038         bool lastTryInserted = false;
1039
1040         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1041         if (array == NULL) {
1042                 array = new Array<Slot *>();
1043                 array->set(0, slot);
1044                 rejectedSlotVector->clear();
1045                 inserted = true;
1046         } else {
1047                 if (array->length() == 0) {
1048                         throw new Error("Server Error: Did not send any slots");
1049                 }
1050
1051                 // if (attemptedToSendToServerTmp) {
1052                 if (hadPartialSendToServer) {
1053
1054                         bool isInserted = false;
1055                         uint size = s->size();
1056                         for(uint i=0; i < size; i++) {
1057                                 Slot *s = array->get(i);
1058                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1059                                         isInserted = true;
1060                                         break;
1061                                 }
1062                         }
1063
1064                         for(uint i=0; i < size; i++) {
1065                                 Slot *s = array->get(i);
1066                                 if (isInserted) {
1067                                         break;
1068                                 }
1069
1070                                 // Process each entry in the slot
1071                                 for (Entry *entry : s->getEntries()) {
1072
1073                                         if (entry->getType() == TypeLastMessage) {
1074                                                 LastMessage *lastMessage = (LastMessage *)entry;
1075
1076                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1077                                                         isInserted = true;
1078                                                         break;
1079                                                 }
1080                                         }
1081                                 }
1082                         }
1083
1084                         if (!isInserted) {
1085                                 rejectedSlotVector->add(slot->getSequenceNumber());
1086                                 lastTryInserted = false;
1087                         } else {
1088                                 lastTryInserted = true;
1089                         }
1090                 } else {
1091                         rejectedSlotVector->add(slot->getSequenceNumber());
1092                         lastTryInserted = false;
1093                 }
1094         }
1095
1096         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1097 }
1098
1099 /**
1100  * Returns false if a resize was needed
1101  */
1102 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1103         int newSize = 0;
1104         if (liveSlotCount > bufferResizeThreshold) {
1105                 resize = true;  //Resize is forced
1106         }
1107
1108         if (resize) {
1109                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1110                 TableStatus *status = new TableStatus(slot, newSize);
1111                 slot->addEntry(status);
1112         }
1113
1114         // Fill with rejected slots first before doing anything else
1115         doRejectedMessages(slot);
1116
1117         // Do mandatory rescue of entries
1118         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1119
1120         // Extract working variables
1121         bool needsResize = mandatoryRescueReturn.getFirst();
1122         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1123         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1124
1125         if (needsResize && !resize) {
1126                 // We need to resize but we are not resizing so return false
1127                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1128         }
1129
1130         bool inserted = false;
1131         if (newKeyEntry != NULL) {
1132                 newKeyEntry->setSlot(slot);
1133                 if (slot->hasSpace(newKeyEntry)) {
1134                         slot->addEntry(newKeyEntry);
1135                         inserted = true;
1136                 }
1137         }
1138
1139         // Clear the transactions, aborts and commits that were sent previously
1140         transactionPartsSent->clear();
1141         pendingSendArbitrationEntriesToDelete->clear();
1142
1143         for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1144                 bool isFull = false;
1145                 round->generateParts();
1146                 Vector<Entry *> *parts = round->getParts();
1147
1148                 // Insert pending arbitration data
1149                 for (Entry *arbitrationData : parts) {
1150
1151                         // If it is an abort then we need to set some information
1152                         if (arbitrationData instanceof Abort) {
1153                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1154                         }
1155
1156                         if (!slot->hasSpace(arbitrationData)) {
1157                                 // No space so cant do anything else with these data entries
1158                                 isFull = true;
1159                                 break;
1160                         }
1161
1162                         // Add to this current slot and add it to entries to delete
1163                         slot->addEntry(arbitrationData);
1164                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1165                 }
1166
1167                 if (isFull) {
1168                         break;
1169                 }
1170         }
1171
1172         if (pendingTransactionQueue->size() > 0) {
1173                 Transaction *transaction = pendingTransactionQueue->get(0);
1174                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1175                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1176                         transaction->setSequenceNumber(slot->getSequenceNumber());
1177                 }
1178
1179                 while (true) {
1180                         TransactionPart *part = transaction->getNextPartToSend();
1181                         if (part == NULL) {
1182                                 // Ran out of parts to send for this transaction so move on
1183                                 break;
1184                         }
1185
1186                         if (slot->hasSpace(part)) {
1187                                 slot->addEntry(part);
1188                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1189                                 if (partsSent == NULL) {
1190                                         partsSent = new Vector<int32_t>();
1191                                         transactionPartsSent->put(transaction, partsSent);
1192                                 }
1193                                 partsSent->add(part->getPartNumber());
1194                                 transactionPartsSent->put(transaction, partsSent);
1195                         } else {
1196                                 break;
1197                         }
1198                 }
1199         }
1200
1201         // Fill the remainder of the slot with rescue data
1202         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1203
1204         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1205 }
1206
1207 void Table::doRejectedMessages(Slot *s) {
1208         if (!rejectedSlotVector->isEmpty()) {
1209                 /* TODO: We should avoid generating a rejected message entry if
1210                  * there is already a sufficient entry in the queue (e->g->,
1211                  * equalsto value of true and same sequence number)->  */
1212
1213                 int64_t old_seqn = rejectedSlotVector->firstElement();
1214                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1215                         int64_t new_seqn = rejectedSlotVector->lastElement();
1216                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1217                         s->addEntry(rm);
1218                 } else {
1219                         int64_t prev_seqn = -1;
1220                         int i = 0;
1221                         /* Go through list of missing messages */
1222                         for (; i < rejectedSlotVector->size(); i++) {
1223                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1224                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1225                                 if (s_msg != NULL)
1226                                         break;
1227                                 prev_seqn = curr_seqn;
1228                         }
1229                         /* Generate rejected message entry for missing messages */
1230                         if (prev_seqn != -1) {
1231                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1232                                 s->addEntry(rm);
1233                         }
1234                         /* Generate rejected message entries for present messages */
1235                         for (; i < rejectedSlotVector->size(); i++) {
1236                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1237                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1238                                 int64_t machineid = s_msg->getMachineID();
1239                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1240                                 s->addEntry(rm);
1241                         }
1242                 }
1243         }
1244 }
1245
1246 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1247         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1248         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1249         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1250                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1251         }
1252
1253         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1254         bool seenLiveSlot = false;
1255         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1256         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1257
1258
1259         // Mandatory Rescue
1260         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1261                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1262                 // Push slot number forward
1263                 if (!seenLiveSlot) {
1264                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1265                 }
1266
1267                 if (!previousSlot->isLive()) {
1268                         continue;
1269                 }
1270
1271                 // We have seen a live slot
1272                 seenLiveSlot = true;
1273
1274                 // Get all the live entries for a slot
1275                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1276
1277                 // Iterate over all the live entries and try to rescue them
1278                 for (Entry *liveEntry : liveEntries) {
1279                         if (slot->hasSpace(liveEntry)) {
1280                                 // Enough space to rescue the entry
1281                                 slot->addEntry(liveEntry);
1282                         } else if (currentSequenceNumber == firstIfFull) {
1283                                 //if there's no space but the entry is about to fall off the queue
1284                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1285                         }
1286                 }
1287         }
1288
1289         // Did not resize
1290         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1291 }
1292
1293 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1294         /* now go through live entries from least to greatest sequence number until
1295          * either all live slots added, or the slot doesn't have enough room
1296          * for SKIP_THRESHOLD consecutive entries*/
1297         int skipcount = 0;
1298         int64_t newestseqnum = buffer->getNewestSeqNum();
1299 search:
1300         for (; seqn <= newestseqnum; seqn++) {
1301                 Slot *prevslot = buffer->getSlot(seqn);
1302                 //Push slot number forward
1303                 if (!seenliveslot)
1304                         oldestLiveSlotSequenceNumver = seqn;
1305
1306                 if (!prevslot->isLive())
1307                         continue;
1308                 seenliveslot = true;
1309                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1310                 for (Entry *liveentry : liveentries) {
1311                         if (s->hasSpace(liveentry))
1312                                 s->addEntry(liveentry);
1313                         else {
1314                                 skipcount++;
1315                                 if (skipcount > Table_SKIP_THRESHOLD)
1316                                         break search;
1317                         }
1318                 }
1319         }
1320 }
1321
1322 /**
1323  * Checks for malicious activity and updates the local copy of the block chain->
1324  */
1325 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1326         // The cloud communication layer has checked slot HMACs already
1327         // before decoding
1328         if (newSlots->length() == 0) {
1329                 return;
1330         }
1331
1332         // Make sure all slots are newer than the last largest slot this
1333         // client has seen
1334         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1335         if (firstSeqNum <= sequenceNumber) {
1336                 throw new Error("Server Error: Sent older slots!");
1337         }
1338
1339         // Create an object that can access both new slots and slots in our
1340         // local chain without committing slots to our local chain
1341         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1342
1343         // Check that the HMAC chain is not broken
1344         checkHMACChain(indexer, newSlots);
1345
1346         // Set to keep track of messages from clients
1347         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1348
1349         // Process each slots data
1350         for (Slot *slot : newSlots) {
1351                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1352
1353                 updateExpectedSize();
1354         }
1355
1356         // If there is a gap, check to see if the server sent us
1357         // everything->
1358         if (firstSeqNum != (sequenceNumber + 1)) {
1359
1360                 // Check the size of the slots that were sent down by the server->
1361                 // Can only check the size if there was a gap
1362                 checkNumSlots(newSlots->length);
1363
1364                 // Since there was a gap every machine must have pushed a slot or
1365                 // must have a last message message-> If not then the server is
1366                 // hiding slots
1367                 if (!machineSet->isEmpty()) {
1368                         throw new Error("Missing record for machines: ");
1369                 }
1370         }
1371
1372         // Update the size of our local block chain->
1373         commitNewMaxSize();
1374
1375         // Commit new to slots to the local block chain->
1376         for (Slot *slot : newSlots) {
1377
1378                 // Insert this slot into our local block chain copy->
1379                 buffer->putSlot(slot);
1380
1381                 // Keep track of how many slots are currently live (have live data
1382                 // in them)->
1383                 liveSlotCount++;
1384         }
1385
1386         // Get the sequence number of the latest slot in the system
1387         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1388         updateLiveStateFromServer();
1389
1390         // No Need to remember after we pulled from the server
1391         offlineTransactionsCommittedAndAtServer->clear();
1392
1393         // This is invalidated now
1394         hadPartialSendToServer = false;
1395 }
1396
1397 void Table::updateLiveStateFromServer() {
1398         // Process the new transaction parts
1399         processNewTransactionParts();
1400
1401         // Do arbitration on new transactions that were received
1402         arbitrateFromServer();
1403
1404         // Update all the committed keys
1405         bool didCommitOrSpeculate = updateCommittedTable();
1406
1407         // Delete the transactions that are now dead
1408         updateLiveTransactionsAndStatus();
1409
1410         // Do speculations
1411         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1412         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1413 }
1414
1415 void Table::updateLiveStateFromLocal() {
1416         // Update all the committed keys
1417         bool didCommitOrSpeculate = updateCommittedTable();
1418
1419         // Delete the transactions that are now dead
1420         updateLiveTransactionsAndStatus();
1421
1422         // Do speculations
1423         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1424         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1425 }
1426
1427 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1428         int64_t prevslots = firstSequenceNumber;
1429
1430         if (didFindTableStatus) {
1431         } else {
1432                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1433         }
1434
1435         didFindTableStatus = true;
1436         currMaxSize = numberOfSlots;
1437 }
1438
1439 void Table::updateExpectedSize() {
1440         expectedsize++;
1441
1442         if (expectedsize > currMaxSize) {
1443                 expectedsize = currMaxSize;
1444         }
1445 }
1446
1447
1448 /**
1449  * Check the size of the block chain to make sure there are enough
1450  * slots sent back by the server-> This is only called when we have a
1451  * gap between the slots that we have locally and the slots sent by
1452  * the server therefore in the slots sent by the server there will be
1453  * at least 1 Table status message
1454  */
1455 void Table::checkNumSlots(int numberOfSlots) {
1456         if (numberOfSlots != expectedsize) {
1457                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1458         }
1459 }
1460
1461 void Table::updateCurrMaxSize(int newmaxsize) {
1462         currMaxSize = newmaxsize;
1463 }
1464
1465
1466 /**
1467  * Update the size of of the local buffer if it is needed->
1468  */
1469 void Table::commitNewMaxSize() {
1470         didFindTableStatus = false;
1471
1472         // Resize the local slot buffer
1473         if (numberOfSlots != currMaxSize) {
1474                 buffer->resize((int32_t)currMaxSize);
1475         }
1476
1477         // Change the number of local slots to the new size
1478         numberOfSlots = (int32_t)currMaxSize;
1479
1480         // Recalculate the resize threshold since the size of the local
1481         // buffer has changed
1482         setResizeThreshold();
1483 }
1484
1485 /**
1486  * Process the new transaction parts from this latest round of slots
1487  * received from the server
1488  */
1489 void Table::processNewTransactionParts() {
1490
1491         if (newTransactionParts->size() == 0) {
1492                 // Nothing new to process
1493                 return;
1494         }
1495
1496         // Iterate through all the machine Ids that we received new parts
1497         // for
1498         for (int64_t machineId : newTransactionParts->keySet()) {
1499                 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1500
1501                 // Iterate through all the parts for that machine Id
1502                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1503                         TransactionPart *part = parts->get(partId);
1504
1505                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1506                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1507                                 // Set dead the transaction part
1508                                 part->setDead();
1509                                 continue;
1510                         }
1511
1512                         // Get the transaction object for that sequence number
1513                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1514
1515                         if (transaction == NULL) {
1516                                 // This is a new transaction that we dont have so make a new one
1517                                 transaction = new Transaction();
1518
1519                                 // Insert this new transaction into the live tables
1520                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1521                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1522                         }
1523
1524                         // Add that part to the transaction
1525                         transaction->addPartDecode(part);
1526                 }
1527         }
1528
1529         // Clear all the new transaction parts in preparation for the next
1530         // time the server sends slots
1531         newTransactionParts->clear();
1532 }
1533
1534 void Table::arbitrateFromServer() {
1535
1536         if (liveTransactionBySequenceNumberTable->size() == 0) {
1537                 // Nothing to arbitrate on so move on
1538                 return;
1539         }
1540
1541         // Get the transaction sequence numbers and sort from oldest to newest
1542         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1543         Collections->sort(transactionSequenceNumbers);
1544
1545         // Collection of key value pairs that are
1546         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1547
1548         // The last transaction arbitrated on
1549         int64_t lastTransactionCommitted = -1;
1550         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1551
1552         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1553                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1554
1555
1556
1557                 // Check if this machine arbitrates for this transaction if not
1558                 // then we cant arbitrate this transaction
1559                 if (transaction->getArbitrator() != localMachineId) {
1560                         continue;
1561                 }
1562
1563                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1564                         continue;
1565                 }
1566
1567                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1568                         // We have seen this already locally so dont commit again
1569                         continue;
1570                 }
1571
1572
1573                 if (!transaction->isComplete()) {
1574                         // Will arbitrate in incorrect order if we continue so just break
1575                         // Most likely this
1576                         break;
1577                 }
1578
1579
1580                 // update the largest transaction seen by arbitrator from server
1581                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1582                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1583                 } else {
1584                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1585                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1586                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1587                         }
1588                 }
1589
1590                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1591                         // Guard evaluated as true
1592
1593                         // Update the local changes so we can make the commit
1594                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1595                                 speculativeTableTmp->put(kv->getKey(), kv);
1596                         }
1597
1598                         // Update what the last transaction committed was for use in batch commit
1599                         lastTransactionCommitted = transactionSequenceNumber;
1600                 } else {
1601                         // Guard evaluated was false so create abort
1602                         // Create the abort
1603                         Abort *newAbort = new Abort(NULL,
1604                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1605                                                                                                                                         transaction->getSequenceNumber(),
1606                                                                                                                                         transaction->getMachineId(),
1607                                                                                                                                         transaction->getArbitrator(),
1608                                                                                                                                         localArbitrationSequenceNumber);
1609                         localArbitrationSequenceNumber++;
1610                         generatedAborts->add(newAbort);
1611
1612                         // Insert the abort so we can process
1613                         processEntry(newAbort);
1614                 }
1615
1616                 lastSeqNumArbOn = transactionSequenceNumber;
1617         }
1618
1619         Commit *newCommit = NULL;
1620
1621         // If there is something to commit
1622         if (speculativeTableTmp->size() != 0) {
1623                 // Create the commit and increment the commit sequence number
1624                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1625                 localArbitrationSequenceNumber++;
1626
1627                 // Add all the new keys to the commit
1628                 for (KeyValue *kv : speculativeTableTmp->values()) {
1629                         newCommit->addKV(kv);
1630                 }
1631
1632                 // create the commit parts
1633                 newCommit->createCommitParts();
1634
1635                 // Append all the commit parts to the end of the pending queue
1636                 // waiting for sending to the server
1637                 // Insert the commit so we can process it
1638                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1639                         processEntry(commitPart);
1640                 }
1641         }
1642
1643         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1644                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1645                 pendingSendArbitrationRounds->add(arbitrationRound);
1646
1647                 if (compactArbitrationData()) {
1648                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1649                         if (newArbitrationRound->getCommit() != NULL) {
1650                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1651                                         processEntry(commitPart);
1652                                 }
1653                         }
1654                 }
1655         }
1656 }
1657
1658 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1659
1660         // Check if this machine arbitrates for this transaction if not then
1661         // we cant arbitrate this transaction
1662         if (transaction->getArbitrator() != localMachineId) {
1663                 return Pair<bool, bool>(false, false);
1664         }
1665
1666         if (!transaction->isComplete()) {
1667                 // Will arbitrate in incorrect order if we continue so just break
1668                 // Most likely this
1669                 return Pair<bool, bool>(false, false);
1670         }
1671
1672         if (transaction->getMachineId() != localMachineId) {
1673                 // dont do this check for local transactions
1674                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1675                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1676                                 // We've have already seen this from the server
1677                                 return Pair<bool, bool>(false, false);
1678                         }
1679                 }
1680         }
1681
1682         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1683                 // Guard evaluated as true Create the commit and increment the
1684                 // commit sequence number
1685                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1686                 localArbitrationSequenceNumber++;
1687
1688                 // Update the local changes so we can make the commit
1689                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1690                         newCommit->addKV(kv);
1691                 }
1692
1693                 // create the commit parts
1694                 newCommit->createCommitParts();
1695
1696                 // Append all the commit parts to the end of the pending queue
1697                 // waiting for sending to the server
1698                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1699                 pendingSendArbitrationRounds->add(arbitrationRound);
1700
1701                 if (compactArbitrationData()) {
1702                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1703                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1704                                 processEntry(commitPart);
1705                         }
1706                 } else {
1707                         // Insert the commit so we can process it
1708                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1709                                 processEntry(commitPart);
1710                         }
1711                 }
1712
1713                 if (transaction->getMachineId() == localMachineId) {
1714                         TransactionStatus *status = transaction->getTransactionStatus();
1715                         if (status != NULL) {
1716                                 status->setStatus(TransactionStatus_StatusCommitted);
1717                         }
1718                 }
1719
1720                 updateLiveStateFromLocal();
1721                 return Pair<bool, bool>(true, true);
1722         } else {
1723                 if (transaction->getMachineId() == localMachineId) {
1724                         // For locally created messages update the status
1725                         // Guard evaluated was false so create abort
1726                         TransactionStatus status = transaction->getTransactionStatus();
1727                         if (status != NULL) {
1728                                 status->setStatus(TransactionStatus_StatusAborted);
1729                         }
1730                 } else {
1731                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1732
1733                         // Create the abort
1734                         Abort *newAbort = new Abort(NULL,
1735                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1736                                                                                                                                         -1,
1737                                                                                                                                         transaction->getMachineId(),
1738                                                                                                                                         transaction->getArbitrator(),
1739                                                                                                                                         localArbitrationSequenceNumber);
1740                         localArbitrationSequenceNumber++;
1741                         addAbortSet->add(newAbort);
1742
1743                         // Append all the commit parts to the end of the pending queue
1744                         // waiting for sending to the server
1745                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1746                         pendingSendArbitrationRounds->add(arbitrationRound);
1747
1748                         if (compactArbitrationData()) {
1749                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1750                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1751                                         processEntry(commitPart);
1752                                 }
1753                         }
1754                 }
1755
1756                 updateLiveStateFromLocal();
1757                 return Pair<bool, bool>(true, false);
1758         }
1759 }
1760
1761 /**
1762  * Compacts the arbitration data my merging commits and aggregating
1763  * aborts so that a single large push of commits can be done instead
1764  * of many small updates
1765  */
1766 bool Table::compactArbitrationData() {
1767         if (pendingSendArbitrationRounds->size() < 2) {
1768                 // Nothing to compact so do nothing
1769                 return false;
1770         }
1771
1772         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1773         if (lastRound->didSendPart()) {
1774                 return false;
1775         }
1776
1777         bool hadCommit = (lastRound->getCommit() == NULL);
1778         bool gotNewCommit = false;
1779
1780         int numberToDelete = 1;
1781         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1782                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1783
1784                 if (round->isFull() || round->didSendPart()) {
1785                         // Stop since there is a part that cannot be compacted and we
1786                         // need to compact in order
1787                         break;
1788                 }
1789
1790                 if (round->getCommit() == NULL) {
1791                         // Try compacting aborts only
1792                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1793                         if (newSize > ArbitrationRound->MAX_PARTS) {
1794                                 // Cant compact since it would be too large
1795                                 break;
1796                         }
1797                         lastRound->addAborts(round->getAborts());
1798                 } else {
1799                         // Create a new larger commit
1800                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1801                         localArbitrationSequenceNumber++;
1802
1803                         // Create the commit parts so that we can count them
1804                         newCommit->createCommitParts();
1805
1806                         // Calculate the new size of the parts
1807                         int newSize = newCommit->getNumberOfParts();
1808                         newSize += lastRound->getAbortsCount();
1809                         newSize += round->getAbortsCount();
1810
1811                         if (newSize > ArbitrationRound->MAX_PARTS) {
1812                                 // Cant compact since it would be too large
1813                                 break;
1814                         }
1815
1816                         // Set the new compacted part
1817                         lastRound->setCommit(newCommit);
1818                         lastRound->addAborts(round->getAborts());
1819                         gotNewCommit = true;
1820                 }
1821
1822                 numberToDelete++;
1823         }
1824
1825         if (numberToDelete != 1) {
1826                 // If there is a compaction
1827                 // Delete the previous pieces that are now in the new compacted piece
1828                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1829                         pendingSendArbitrationRounds->clear();
1830                 } else {
1831                         for (int i = 0; i < numberToDelete; i++) {
1832                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1833                         }
1834                 }
1835
1836                 // Add the new compacted into the pending to send list
1837                 pendingSendArbitrationRounds->add(lastRound);
1838
1839                 // Should reinsert into the commit processor
1840                 if (hadCommit && gotNewCommit) {
1841                         return true;
1842                 }
1843         }
1844
1845         return false;
1846 }
1847
1848 /**
1849  * Update all the commits and the committed tables, sets dead the dead
1850  * transactions
1851  */
1852 bool Table::updateCommittedTable() {
1853
1854         if (newCommitParts->size() == 0) {
1855                 // Nothing new to process
1856                 return false;
1857         }
1858
1859         // Iterate through all the machine Ids that we received new parts for
1860         for (int64_t machineId : newCommitParts->keySet()) {
1861                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1862
1863                 // Iterate through all the parts for that machine Id
1864                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1865                         CommitPart *part = parts->get(partId);
1866
1867                         // Get the transaction object for that sequence number
1868                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1869
1870                         if (commitForClientTable == NULL) {
1871                                 // This is the first commit from this device
1872                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1873                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1874                         }
1875
1876                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1877
1878                         if (commit == NULL) {
1879                                 // This is a new commit that we dont have so make a new one
1880                                 commit = new Commit();
1881
1882                                 // Insert this new commit into the live tables
1883                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1884                         }
1885
1886                         // Add that part to the commit
1887                         commit->addPartDecode(part);
1888                 }
1889         }
1890
1891         // Clear all the new commits parts in preparation for the next time
1892         // the server sends slots
1893         newCommitParts->clear();
1894
1895         // If we process a new commit keep track of it for future use
1896         bool didProcessANewCommit = false;
1897
1898         // Process the commits one by one
1899         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1900
1901                 // Get all the commits for a specific arbitrator
1902                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1903
1904                 // Sort the commits in order
1905                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1906                 Collections->sort(commitSequenceNumbers);
1907
1908                 // Get the last commit seen from this arbitrator
1909                 int64_t lastCommitSeenSequenceNumber = -1;
1910                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1911                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1912                 }
1913
1914                 // Go through each new commit one by one
1915                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1916                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1917                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1918
1919                         // Special processing if a commit is not complete
1920                         if (!commit->isComplete()) {
1921                                 if (i == (commitSequenceNumbers->size() - 1)) {
1922                                         // If there is an incomplete commit and this commit is the
1923                                         // latest one seen then this commit cannot be processed and
1924                                         // there are no other commits
1925                                         break;
1926                                 } else {
1927                                         // This is a commit that was already dead but parts of it
1928                                         // are still in the block chain (not flushed out yet)->
1929                                         // Delete it and move on
1930                                         commit->setDead();
1931                                         commitForClientTable->remove(commit->getSequenceNumber());
1932                                         continue;
1933                                 }
1934                         }
1935
1936                         // Update the last transaction that was updated if we can
1937                         if (commit->getTransactionSequenceNumber() != -1) {
1938                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1939
1940                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1941                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1942                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1943                                 }
1944                         }
1945
1946                         // Update the last arbitration data that we have seen so far
1947                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1948
1949                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1950                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1951                                         // Is larger
1952                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1953                                 }
1954                         } else {
1955                                 // Never seen any data from this arbitrator so record the first one
1956                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1957                         }
1958
1959                         // We have already seen this commit before so need to do the
1960                         // full processing on this commit
1961                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1962
1963                                 // Update the last transaction that was updated if we can
1964                                 if (commit->getTransactionSequenceNumber() != -1) {
1965                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1966
1967                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1968                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1969                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1970                                         }
1971                                 }
1972
1973                                 continue;
1974                         }
1975
1976                         // If we got here then this is a brand new commit and needs full
1977                         // processing
1978                         // Get what commits should be edited, these are the commits that
1979                         // have live values for their keys
1980                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1981                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1982                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1983                         }
1984                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
1985
1986                         // Update each previous commit that needs to be updated
1987                         for (Commit *previousCommit : commitsToEdit) {
1988
1989                                 // Only bother with live commits (TODO: Maybe remove this check)
1990                                 if (previousCommit->isLive()) {
1991
1992                                         // Update which keys in the old commits are still live
1993                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1994                                                 previousCommit->invalidateKey(kv->getKey());
1995                                         }
1996
1997                                         // if the commit is now dead then remove it
1998                                         if (!previousCommit->isLive()) {
1999                                                 commitForClientTable->remove(previousCommit);
2000                                         }
2001                                 }
2002                         }
2003
2004                         // Update the last seen sequence number from this arbitrator
2005                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2006                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2007                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2008                                 }
2009                         } else {
2010                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2011                         }
2012
2013                         // We processed a new commit that we havent seen before
2014                         didProcessANewCommit = true;
2015
2016                         // Update the committed table of keys and which commit is using which key
2017                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2018                                 committedKeyValueTable->put(kv->getKey(), kv);
2019                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2020                         }
2021                 }
2022         }
2023
2024         return didProcessANewCommit;
2025 }
2026
2027 /**
2028  * Create the speculative table from transactions that are still live
2029  * and have come from the cloud
2030  */
2031 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2032         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2033                 // There is nothing to speculate on
2034                 return false;
2035         }
2036
2037         // Create a list of the transaction sequence numbers and sort them
2038         // from oldest to newest
2039         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2040         Collections->sort(transactionSequenceNumbersSorted);
2041
2042         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2043
2044
2045         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2046                 // If there is a gap in the transaction sequence numbers then
2047                 // there was a commit or an abort of a transaction OR there was a
2048                 // new commit (Could be from offline commit) so a redo the
2049                 // speculation from scratch
2050
2051                 // Start from scratch
2052                 speculatedKeyValueTable->clear();
2053                 lastTransactionSequenceNumberSpeculatedOn = -1;
2054                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2055
2056         }
2057
2058         // Remember the front of the transaction list
2059         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2060
2061         // Find where to start arbitration from
2062         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2063
2064         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2065                 // Make sure we are not out of bounds
2066                 return false;           // did not speculate
2067         }
2068
2069         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2070         bool didSkip = true;
2071
2072         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2073                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2074                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2075
2076                 if (!transaction->isComplete()) {
2077                         // If there is an incomplete transaction then there is nothing
2078                         // we can do add this transactions arbitrator to the list of
2079                         // arbitrators we should ignore
2080                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2081                         didSkip = true;
2082                         continue;
2083                 }
2084
2085                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2086                         continue;
2087                 }
2088
2089                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2090
2091                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2092                         // Guard evaluated to true so update the speculative table
2093                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2094                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2095                         }
2096                 }
2097         }
2098
2099         if (didSkip) {
2100                 // Since there was a skip we need to redo the speculation next time around
2101                 lastTransactionSequenceNumberSpeculatedOn = -1;
2102                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2103         }
2104
2105         // We did some speculation
2106         return true;
2107 }
2108
2109 /**
2110  * Create the pending transaction speculative table from transactions
2111  * that are still in the pending transaction buffer
2112  */
2113 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2114         if (pendingTransactionQueue->size() == 0) {
2115                 // There is nothing to speculate on
2116                 return;
2117         }
2118
2119         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2120                 // need to reset on the pending speculation
2121                 lastPendingTransactionSpeculatedOn = NULL;
2122                 firstPendingTransaction = pendingTransactionQueue->get(0);
2123                 pendingTransactionSpeculatedKeyValueTable->clear();
2124         }
2125
2126         // Find where to start arbitration from
2127         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2128
2129         if (startIndex >= pendingTransactionQueue->size()) {
2130                 // Make sure we are not out of bounds
2131                 return;
2132         }
2133
2134         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2135                 Transaction *transaction = pendingTransactionQueue->get(i);
2136
2137                 lastPendingTransactionSpeculatedOn = transaction;
2138
2139                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2140                         // Guard evaluated to true so update the speculative table
2141                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2142                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2143                         }
2144                 }
2145         }
2146 }
2147
2148 /**
2149  * Set dead and remove from the live transaction tables the
2150  * transactions that are dead
2151  */
2152 void Table::updateLiveTransactionsAndStatus() {
2153
2154         // Go through each of the transactions
2155         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2156                 Transaction *transaction = iter->next()->getValue();
2157
2158                 // Check if the transaction is dead
2159                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2160                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2161
2162                         // Set dead the transaction
2163                         transaction->setDead();
2164
2165                         // Remove the transaction from the live table
2166                         iter->remove();
2167                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2168                 }
2169         }
2170
2171         // Go through each of the transactions
2172         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2173                 TransactionStatus *status = iter->next()->getValue();
2174
2175                 // Check if the transaction is dead
2176                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2177                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2178
2179                         // Set committed
2180                         status->setStatus(TransactionStatus_StatusCommitted);
2181
2182                         // Remove
2183                         iter->remove();
2184                 }
2185         }
2186 }
2187
2188 /**
2189  * Process this slot, entry by entry->  Also update the latest message sent by slot
2190  */
2191 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2192
2193         // Update the last message seen
2194         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2195
2196         // Process each entry in the slot
2197         for (Entry *entry : slot->getEntries()) {
2198                 switch (entry->getType()) {
2199                 case TypeCommitPart:
2200                         processEntry((CommitPart *)entry);
2201                         break;
2202                 case TypeAbort:
2203                         processEntry((Abort *)entry);
2204                         break;
2205                 case TypeTransactionPart:
2206                         processEntry((TransactionPart *)entry);
2207                         break;
2208                 case TypeNewKey:
2209                         processEntry((NewKey *)entry);
2210                         break;
2211                 case TypeLastMessage:
2212                         processEntry((LastMessage *)entry, machineSet);
2213                         break;
2214                 case TypeRejectedMessage:
2215                         processEntry((RejectedMessage *)entry, indexer);
2216                         break;
2217                 case TypeTableStatus:
2218                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2219                         break;
2220                 default:
2221                         throw new Error("Unrecognized type: ");
2222                 }
2223         }
2224 }
2225
2226 /**
2227  * Update the last message that was sent for a machine Id
2228  */
2229 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2230         // Update what the last message received by a machine was
2231         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2232 }
2233
2234 /**
2235  * Add the new key to the arbitrators table and update the set of live
2236  * new keys (in case of a rescued new key message)
2237  */
2238 void Table::processEntry(NewKey *entry) {
2239         // Update the arbitrator table with the new key information
2240         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2241
2242         // Update what the latest live new key is
2243         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2244         if (oldNewKey != NULL) {
2245                 // Delete the old new key messages
2246                 oldNewKey->setDead();
2247         }
2248 }
2249
2250 /**
2251  * Process new table status entries and set dead the old ones as new
2252  * ones come in-> keeps track of the largest and smallest table status
2253  * seen in this current round of updating the local copy of the block
2254  * chain
2255  */
2256 void Table::processEntry(TableStatus entry, int64_t seq) {
2257         int newNumSlots = entry->getMaxSlots();
2258         updateCurrMaxSize(newNumSlots);
2259         initExpectedSize(seq, newNumSlots);
2260
2261         if (liveTableStatus != NULL) {
2262                 // We have a larger table status so the old table status is no
2263                 // int64_ter alive
2264                 liveTableStatus->setDead();
2265         }
2266
2267         // Make this new table status the latest alive table status
2268         liveTableStatus = entry;
2269 }
2270
2271 /**
2272  * Check old messages to see if there is a block chain violation->
2273  * Also
2274  */
2275 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2276         int64_t oldSeqNum = entry->getOldSeqNum();
2277         int64_t newSeqNum = entry->getNewSeqNum();
2278         bool isequal = entry->getEqual();
2279         int64_t machineId = entry->getMachineID();
2280         int64_t seq = entry->getSequenceNumber();
2281
2282         // Check if we have messages that were supposed to be rejected in
2283         // our local block chain
2284         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2285                 // Get the slot
2286                 Slot *slot = indexer->getSlot(seqNum);
2287
2288                 if (slot != NULL) {
2289                         // If we have this slot make sure that it was not supposed to be
2290                         // a rejected slot
2291                         int64_t slotMachineId = slot->getMachineID();
2292                         if (isequal != (slotMachineId == machineId)) {
2293                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2294                         }
2295                 }
2296         }
2297
2298         // Create a list of clients to watch until they see this rejected
2299         // message entry->
2300         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2301         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2302                 // Machine ID for the last message entry
2303                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2304
2305                 // We've seen it, don't need to continue to watch->  Our next
2306                 // message will implicitly acknowledge it->
2307                 if (lastMessageEntryMachineId == localMachineId) {
2308                         continue;
2309                 }
2310
2311                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2312                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2313
2314                 if (entrySequenceNumber < seq) {
2315                         // Add this rejected message to the set of messages that this
2316                         // machine ID did not see yet
2317                         addWatchVector(lastMessageEntryMachineId, entry);
2318                         // This client did not see this rejected message yet so add it
2319                         // to the watch set to monitor
2320                         deviceWatchSet->add(lastMessageEntryMachineId);
2321                 }
2322         }
2323         if (deviceWatchSet->isEmpty()) {
2324                 // This rejected message has been seen by all the clients so
2325                 entry->setDead();
2326         } else {
2327                 // We need to watch this rejected message
2328                 entry->setWatchSet(deviceWatchSet);
2329         }
2330 }
2331
2332 /**
2333  * Check if this abort is live, if not then save it so we can kill it
2334  * later-> update the last transaction number that was arbitrated on->
2335  */
2336 void Table::processEntry(Abort *entry) {
2337         if (entry->getTransactionSequenceNumber() != -1) {
2338                 // update the transaction status if it was sent to the server
2339                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2340                 if (status != NULL) {
2341                         status->setStatus(TransactionStatus_StatusAborted);
2342                 }
2343         }
2344
2345         // Abort has not been seen by the client it is for yet so we need to
2346         // keep track of it
2347         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2348         if (previouslySeenAbort != NULL) {
2349                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2350         }
2351
2352         if (entry->getTransactionArbitrator() == localMachineId) {
2353                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2354         }
2355
2356         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2357                 // The machine already saw this so it is dead
2358                 entry->setDead();
2359                 liveAbortTable->remove(entry->getAbortId());
2360
2361                 if (entry->getTransactionArbitrator() == localMachineId) {
2362                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2363                 }
2364                 return;
2365         }
2366
2367         // Update the last arbitration data that we have seen so far
2368         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2369                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2370                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2371                         // Is larger
2372                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2373                 }
2374         } else {
2375                 // Never seen any data from this arbitrator so record the first one
2376                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2377         }
2378
2379         // Set dead a transaction if we can
2380         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2381         if (transactionToSetDead != NULL) {
2382                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2383         }
2384
2385         // Update the last transaction sequence number that the arbitrator
2386         // arbitrated on
2387         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2388         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2389                 // Is a valid one
2390                 if (entry->getTransactionSequenceNumber() != -1) {
2391                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2392                 }
2393         }
2394 }
2395
2396 /**
2397  * Set dead the transaction part if that transaction is dead and keep
2398  * track of all new parts
2399  */
2400 void Table::processEntry(TransactionPart *entry) {
2401         // Check if we have already seen this transaction and set it dead OR
2402         // if it is not alive
2403         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2404         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2405                 // This transaction is dead, it was already committed or aborted
2406                 entry->setDead();
2407                 return;
2408         }
2409
2410         // This part is still alive
2411         Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2412
2413         if (transactionPart == NULL) {
2414                 // Dont have a table for this machine Id yet so make one
2415                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2416                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2417         }
2418
2419         // Update the part and set dead ones we have already seen (got a
2420         // rescued version)
2421         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2422         if (previouslySeenPart != NULL) {
2423                 previouslySeenPart->setDead();
2424         }
2425 }
2426
2427 /**
2428  * Process new commit entries and save them for future use->  Delete duplicates
2429  */
2430 void Table::processEntry(CommitPart *entry) {
2431         // Update the last transaction that was updated if we can
2432         if (entry->getTransactionSequenceNumber() != -1) {
2433                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2434                 // Update the last transaction sequence number that the arbitrator
2435                 // arbitrated on
2436                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2437                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2438                 }
2439         }
2440
2441         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2442         if (commitPart == NULL) {
2443                 // Don't have a table for this machine Id yet so make one
2444                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2445                 newCommitParts->put(entry->getMachineId(), commitPart);
2446         }
2447         // Update the part and set dead ones we have already seen (got a
2448         // rescued version)
2449         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2450         if (previouslySeenPart != NULL) {
2451                 previouslySeenPart->setDead();
2452         }
2453 }
2454
2455 /**
2456  * Update the last message seen table-> Update and set dead the
2457  * appropriate RejectedMessages as clients see them-> Updates the live
2458  * aborts, removes those that are dead and sets them dead-> Check that
2459  * the last message seen is correct and that there is no mismatch of
2460  * our own last message or that other clients have not had a rollback
2461  * on the last message->
2462  */
2463 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2464         // We have seen this machine ID
2465         machineSet->remove(machineId);
2466
2467         // Get the set of rejected messages that this machine Id is has not seen yet
2468         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2469         // If there is a rejected message that this machine Id has not seen yet
2470         if (watchset != NULL) {
2471                 // Go through each rejected message that this machine Id has not
2472                 // seen yet
2473                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2474                         RejectedMessage *rm = rmit->next();
2475                         // If this machine Id has seen this rejected message->->->
2476                         if (rm->getSequenceNumber() <= seqNum) {
2477                                 // Remove it from our watchlist
2478                                 rmit->remove();
2479                                 // Decrement machines that need to see this notification
2480                                 rm->removeWatcher(machineId);
2481                         }
2482                 }
2483         }
2484
2485         // Set dead the abort
2486         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2487                 Abort *abort = i->next()->getValue();
2488                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2489                         abort->setDead();
2490                         i->remove();
2491                         if (abort->getTransactionArbitrator() == localMachineId) {
2492                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2493                         }
2494                 }
2495         }
2496         if (machineId == localMachineId) {
2497                 // Our own messages are immediately dead->
2498                 if (liveness instanceof LastMessage) {
2499                         ((LastMessage *)liveness)->setDead();
2500                 } else if (liveness instanceof Slot) {
2501                         ((Slot *)liveness)->setDead();
2502                 } else {
2503                         throw new Error("Unrecognized type");
2504                 }
2505         }
2506         // Get the old last message for this device
2507         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2508         if (lastMessageEntry == NULL) {
2509                 // If no last message then there is nothing else to process
2510                 return;
2511         }
2512
2513         int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2514         Liveness *lastEntry = lastMessageEntry.getSecond();
2515
2516         // If it is not our machine Id since we already set ours to dead
2517         if (machineId != localMachineId) {
2518                 if (lastEntry instanceof LastMessage) {
2519                         ((LastMessage *)lastEntry)->setDead();
2520                 } else if (lastEntry instanceof Slot) {
2521                         ((Slot *)lastEntry)->setDead();
2522                 } else {
2523                         throw new Error("Unrecognized type");
2524                 }
2525         }
2526         // Make sure the server is not playing any games
2527         if (machineId == localMachineId) {
2528                 if (hadPartialSendToServer) {
2529                         // We were not making any updates and we had a machine mismatch
2530                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2531                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2532                         }
2533                 } else {
2534                         // We were not making any updates and we had a machine mismatch
2535                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2536                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2537                         }
2538                 }
2539         } else {
2540                 if (lastMessageSeqNum > seqNum) {
2541                         throw new Error("Server Error: Rollback on remote machine sequence number");
2542                 }
2543         }
2544 }
2545
2546 /**
2547  * Add a rejected message entry to the watch set to keep track of
2548  * which clients have seen that rejected message entry and which have
2549  * not.
2550  */
2551 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2552         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2553         if (entries == NULL) {
2554                 // There is no set for this machine ID yet so create one
2555                 entries = new Hashset<RejectedMessage *>();
2556                 rejectedMessageWatchVectorTable->put(machineId, entries);
2557         }
2558         entries->add(entry);
2559 }
2560
2561 /**
2562  * Check if the HMAC chain is not violated
2563  */
2564 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2565         for (int i = 0; i < newSlots->length(); i++) {
2566                 Slot *currSlot = newSlots->get(i);
2567                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2568                 if (prevSlot != NULL &&
2569                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2570                         throw new Error("Server Error: Invalid HMAC Chain");
2571         }
2572 }