5f93fe894925c0f9321ff6b08c0f98d79556bb15
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14 #include "ByteBuffer.h"
15 #include "Abort.h"
16 #include "CommitPart.h"
17
18
19 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
20         buffer(NULL),
21         cloud(new CloudComm(this, baseurl, password, listeningPort)),
22         random(NULL),
23         liveTableStatus(NULL),
24         pendingTransactionBuilder(NULL),
25         lastPendingTransactionSpeculatedOn(NULL),
26         firstPendingTransaction(NULL),
27         numberOfSlots(0),
28         bufferResizeThreshold(0),
29         liveSlotCount(0),
30         oldestLiveSlotSequenceNumver(1),
31         localMachineId(_localMachineId),
32         sequenceNumber(0),
33         localTransactionSequenceNumber(0),
34         lastTransactionSequenceNumberSpeculatedOn(0),
35         oldestTransactionSequenceNumberSpeculatedOn(0),
36         localArbitrationSequenceNumber(0),
37         hadPartialSendToServer(false),
38         attemptedToSendToServer(false),
39         expectedsize(0),
40         didFindTableStatus(false),
41         currMaxSize(0),
42         lastSlotAttemptedToSend(NULL),
43         lastIsNewKey(false),
44         lastNewSize(0),
45         lastTransactionPartsSent(NULL),
46         lastPendingSendArbitrationEntriesToDelete(NULL),
47         lastNewKey(NULL),
48         committedKeyValueTable(NULL),
49         speculatedKeyValueTable(NULL),
50         pendingTransactionSpeculatedKeyValueTable(NULL),
51         liveNewKeyTable(NULL),
52         lastMessageTable(NULL),
53         rejectedMessageWatchVectorTable(NULL),
54         arbitratorTable(NULL),
55         liveAbortTable(NULL),
56         newTransactionParts(NULL),
57         newCommitParts(NULL),
58         lastArbitratedTransactionNumberByArbitratorTable(NULL),
59         liveTransactionBySequenceNumberTable(NULL),
60         liveTransactionByTransactionIdTable(NULL),
61         liveCommitsTable(NULL),
62         liveCommitsByKeyTable(NULL),
63         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
64         rejectedSlotVector(NULL),
65         pendingTransactionQueue(NULL),
66         pendingSendArbitrationRounds(NULL),
67         pendingSendArbitrationEntriesToDelete(NULL),
68         transactionPartsSent(NULL),
69         outstandingTransactionStatus(NULL),
70         liveAbortsGeneratedByLocal(NULL),
71         offlineTransactionsCommittedAndAtServer(NULL),
72         localCommunicationTable(NULL),
73         lastTransactionSeenFromMachineFromServer(NULL),
74         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
75         lastInsertedNewKey(false),
76         lastSeqNumArbOn(0)
77 {
78         init();
79 }
80
81 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82         buffer(NULL),
83         cloud(_cloud),
84         random(NULL),
85         liveTableStatus(NULL),
86         pendingTransactionBuilder(NULL),
87         lastPendingTransactionSpeculatedOn(NULL),
88         firstPendingTransaction(NULL),
89         numberOfSlots(0),
90         bufferResizeThreshold(0),
91         liveSlotCount(0),
92         oldestLiveSlotSequenceNumver(1),
93         localMachineId(_localMachineId),
94         sequenceNumber(0),
95         localTransactionSequenceNumber(0),
96         lastTransactionSequenceNumberSpeculatedOn(0),
97         oldestTransactionSequenceNumberSpeculatedOn(0),
98         localArbitrationSequenceNumber(0),
99         hadPartialSendToServer(false),
100         attemptedToSendToServer(false),
101         expectedsize(0),
102         didFindTableStatus(false),
103         currMaxSize(0),
104         lastSlotAttemptedToSend(NULL),
105         lastIsNewKey(false),
106         lastNewSize(0),
107         lastTransactionPartsSent(NULL),
108         lastPendingSendArbitrationEntriesToDelete(NULL),
109         lastNewKey(NULL),
110         committedKeyValueTable(NULL),
111         speculatedKeyValueTable(NULL),
112         pendingTransactionSpeculatedKeyValueTable(NULL),
113         liveNewKeyTable(NULL),
114         lastMessageTable(NULL),
115         rejectedMessageWatchVectorTable(NULL),
116         arbitratorTable(NULL),
117         liveAbortTable(NULL),
118         newTransactionParts(NULL),
119         newCommitParts(NULL),
120         lastArbitratedTransactionNumberByArbitratorTable(NULL),
121         liveTransactionBySequenceNumberTable(NULL),
122         liveTransactionByTransactionIdTable(NULL),
123         liveCommitsTable(NULL),
124         liveCommitsByKeyTable(NULL),
125         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
126         rejectedSlotVector(NULL),
127         pendingTransactionQueue(NULL),
128         pendingSendArbitrationRounds(NULL),
129         pendingSendArbitrationEntriesToDelete(NULL),
130         transactionPartsSent(NULL),
131         outstandingTransactionStatus(NULL),
132         liveAbortsGeneratedByLocal(NULL),
133         offlineTransactionsCommittedAndAtServer(NULL),
134         localCommunicationTable(NULL),
135         lastTransactionSeenFromMachineFromServer(NULL),
136         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
137         lastInsertedNewKey(false),
138         lastSeqNumArbOn(0)
139 {
140         init();
141 }
142
143 /**
144  * Init all the stuff needed for for table usage
145  */
146 void Table::init() {
147         // Init helper objects
148         random = new Random();
149         buffer = new SlotBuffer();
150
151         // init data structs
152         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
153         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
154         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
155         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
156         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
157         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
158         arbitratorTable = new Hashtable<IoTString *, int64_t>();
159         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
160         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
161         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
162         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
163         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
164         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
165         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
166         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
167         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
168         rejectedSlotVector = new Vector<int64_t>();
169         pendingTransactionQueue = new Vector<Transaction *>();
170         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
171         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
172         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
173         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
174         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals>();
175         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
176         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
177         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
178         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
179
180         // Other init stuff
181         numberOfSlots = buffer->capacity();
182         setResizeThreshold();
183 }
184
185 /**
186  * Initialize the table by inserting a table status as the first entry
187  * into the table status also initialize the crypto stuff.
188  */
189 void Table::initTable() {
190         cloud->initSecurity();
191
192         // Create the first insertion into the block chain which is the table status
193         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
194         localSequenceNumber++;
195         TableStatus *status = new TableStatus(s, numberOfSlots);
196         s->addEntry(status);
197         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
198
199         if (array == NULL) {
200                 array = new Array<Slot *>(1);
201                 array->set(0, s);
202                 // update local block chain
203                 validateAndUpdate(array, true);
204         } else if (array->length() == 1) {
205                 // in case we did push the slot BUT we failed to init it
206                 validateAndUpdate(array, true);
207         } else {
208                 throw new Error("Error on initialization");
209         }
210 }
211
212 /**
213  * Rebuild the table from scratch by pulling the latest block chain
214  * from the server.
215  */
216 void Table::rebuild() {
217         // Just pull the latest slots from the server
218         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
219         validateAndUpdate(newslots, true);
220         sendToServer(NULL);
221         updateLiveTransactionsAndStatus();
222 }
223
224 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
225         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
226 }
227
228 int64_t Table::getArbitrator(IoTString *key) {
229         return arbitratorTable->get(key);
230 }
231
232 void Table::close() {
233         cloud->close();
234 }
235
236 IoTString *Table::getCommitted(IoTString *key)  {
237         KeyValue *kv = committedKeyValueTable->get(key);
238
239         if (kv != NULL) {
240                 return kv->getValue();
241         } else {
242                 return NULL;
243         }
244 }
245
246 IoTString *Table::getSpeculative(IoTString *key) {
247         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
248
249         if (kv == NULL) {
250                 kv = speculatedKeyValueTable->get(key);
251         }
252
253         if (kv == NULL) {
254                 kv = committedKeyValueTable->get(key);
255         }
256
257         if (kv != NULL) {
258                 return kv->getValue();
259         } else {
260                 return NULL;
261         }
262 }
263
264 IoTString *Table::getCommittedAtomic(IoTString *key) {
265         KeyValue *kv = committedKeyValueTable->get(key);
266
267         if (!arbitratorTable->contains(key)) {
268                 throw new Error("Key not Found.");
269         }
270
271         // Make sure new key value pair matches the current arbitrator
272         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
273                 // TODO: Maybe not throw en error
274                 throw new Error("Not all Key Values Match Arbitrator.");
275         }
276
277         if (kv != NULL) {
278                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
279                 return kv->getValue();
280         } else {
281                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
282                 return NULL;
283         }
284 }
285
286 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
287         if (!arbitratorTable->contains(key)) {
288                 throw new Error("Key not Found.");
289         }
290
291         // Make sure new key value pair matches the current arbitrator
292         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
293                 // TODO: Maybe not throw en error
294                 throw new Error("Not all Key Values Match Arbitrator.");
295         }
296
297         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298
299         if (kv == NULL) {
300                 kv = speculatedKeyValueTable->get(key);
301         }
302
303         if (kv == NULL) {
304                 kv = committedKeyValueTable->get(key);
305         }
306
307         if (kv != NULL) {
308                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
309                 return kv->getValue();
310         } else {
311                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
312                 return NULL;
313         }
314 }
315
316 bool Table::update()  {
317         try {
318                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
319                 validateAndUpdate(newSlots, false);
320                 sendToServer(NULL);
321                 updateLiveTransactionsAndStatus();
322                 return true;
323         } catch (Exception *e) {
324                 SetIterator<int64_t> * kit = getKeyIterator(localCommunicationTable);
325                 while(kit->hasNext()) {
326                         int64_t m = kit->next();
327                         updateFromLocal(m);
328                 }
329                 delete kit;
330         }
331
332         return false;
333 }
334
335 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
336         while (true) {
337                 if (!arbitratorTable->contains(keyName)) {
338                         // There is already an arbitrator
339                         return false;
340                 }
341                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
342
343                 if (sendToServer(newKey)) {
344                         // If successfully inserted
345                         return true;
346                 }
347         }
348 }
349
350 void Table::startTransaction() {
351         // Create a new transaction, invalidates any old pending transactions.
352         pendingTransactionBuilder = new PendingTransaction(localMachineId);
353 }
354
355 void Table::addKV(IoTString *key, IoTString *value) {
356
357         // Make sure it is a valid key
358         if (!arbitratorTable->contains(key)) {
359                 throw new Error("Key not Found.");
360         }
361
362         // Make sure new key value pair matches the current arbitrator
363         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
364                 // TODO: Maybe not throw en error
365                 throw new Error("Not all Key Values Match Arbitrator.");
366         }
367
368         // Add the key value to this transaction
369         KeyValue *kv = new KeyValue(key, value);
370         pendingTransactionBuilder->addKV(kv);
371 }
372
373 TransactionStatus *Table::commitTransaction() {
374         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375                 // transaction with no updates will have no effect on the system
376                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
377         }
378
379         // Set the local transaction sequence number and increment
380         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381         localTransactionSequenceNumber++;
382
383         // Create the transaction status
384         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
385
386         // Create the new transaction
387         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388         newTransaction->setTransactionStatus(transactionStatus);
389
390         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391                 // Add it to the queue and invalidate the builder for safety
392                 pendingTransactionQueue->add(newTransaction);
393         } else {
394                 arbitrateOnLocalTransaction(newTransaction);
395                 updateLiveStateFromLocal();
396         }
397
398         pendingTransactionBuilder = new PendingTransaction(localMachineId);
399
400         try {
401                 sendToServer(NULL);
402         } catch (ServerException *e) {
403
404                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405                 uint size = pendingTransactionQueue->size();
406                 uint oldindex = 0;
407                 for(int iter = 0; iter < size; iter++) {
408                         Transaction *transaction = pendingTransactionQueue->get(iter);
409                         pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
410                         
411                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
412                                 // Already contacted this client so ignore all attempts to contact this client
413                                 // to preserve ordering for arbitrator
414                                 continue;
415                         }
416
417                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
418
419                         if (sendReturn.getFirst()) {
420                                 // Failed to contact over local
421                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
422                         } else {
423                                 // Successful contact or should not contact
424
425                                 if (sendReturn.getSecond()) {
426                                         // did arbitrate
427                                         oldindex--;
428                                 }
429                         }
430                 }
431                 pendingTransactionQueue->setSize(oldindex);
432         }
433         
434         updateLiveStateFromLocal();
435
436         return transactionStatus;
437 }
438
439 /**
440  * Recalculate the new resize threshold
441  */
442 void Table::setResizeThreshold() {
443         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
444         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
445 }
446
447 int64_t Table::getLocalSequenceNumber() {
448         return localSequenceNumber;
449 }
450
451 bool Table::sendToServer(NewKey *newKey) {
452         bool fromRetry = false;
453         try {
454                 if (hadPartialSendToServer) {
455                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
456                         if (newSlots->length() == 0) {
457                                 fromRetry = true;
458                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
459
460                                 if (sendSlotsReturn.getFirst()) {
461                                         if (newKey != NULL) {
462                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
463                                                         newKey = NULL;
464                                                 }
465                                         }
466
467                                         SetIterator<Transaction *> * trit = getKeyIterator(lastTransactionPartsSent);
468                                         while(trit->hasNext()) {
469                                                 Transaction *transaction = trit->next();
470                                                 transaction->resetServerFailure();
471                                                 // Update which transactions parts still need to be sent
472                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
473                                                 // Add the transaction status to the outstanding list
474                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
475
476                                                 // Update the transaction status
477                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
478
479                                                 // Check if all the transaction parts were successfully
480                                                 // sent and if so then remove it from pending
481                                                 if (transaction->didSendAllParts()) {
482                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483                                                         pendingTransactionQueue->remove(transaction);
484                                                 }
485                                         }
486                                         delete trit;
487                                 } else {
488                                         newSlots = sendSlotsReturn.getThird();
489                                         bool isInserted = false;
490                                         for (uint si = 0; si < newSlots->length(); si++) {
491                                                 Slot *s = newSlots->get(si);
492                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
493                                                         isInserted = true;
494                                                         break;
495                                                 }
496                                         }
497
498                                         for (uint si = 0; si < newSlots->length(); si++) {
499                                                 Slot *s = newSlots->get(si);
500                                                 if (isInserted) {
501                                                         break;
502                                                 }
503
504                                                 // Process each entry in the slot
505                                                 Vector<Entry *> * ventries=s->getEntries();
506                                                 uint vesize = ventries->size();
507                                                 for(uint vei = 0; vei < vesize; vei++) {
508                                                         Entry *entry = ventries->get(vei);
509                                                         if (entry->getType() == TypeLastMessage) {
510                                                                 LastMessage *lastMessage = (LastMessage *)entry;
511                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
512                                                                         isInserted = true;
513                                                                         break;
514                                                                 }
515                                                         }
516                                                 }
517                                         }
518
519                                         if (isInserted) {
520                                                 if (newKey != NULL) {
521                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
522                                                                 newKey = NULL;
523                                                         }
524                                                 }
525
526                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
527                                                         transaction->resetServerFailure();
528
529                                                         // Update which transactions parts still need to be sent
530                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
531
532                                                         // Add the transaction status to the outstanding list
533                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
534
535                                                         // Update the transaction status
536                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
537
538                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
539                                                         if (transaction->didSendAllParts()) {
540                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
541                                                                 pendingTransactionQueue->remove(transaction);
542                                                         } else {
543                                                                 transaction->resetServerFailure();
544                                                                 // Set the transaction sequence number back to nothing
545                                                                 if (!transaction->didSendAPartToServer()) {
546                                                                         transaction->setSequenceNumber(-1);
547                                                                 }
548                                                         }
549                                                 }
550                                         }
551                                 }
552
553                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
554                                         transaction->resetServerFailure();
555                                         // Set the transaction sequence number back to nothing
556                                         if (!transaction->didSendAPartToServer()) {
557                                                 transaction->setSequenceNumber(-1);
558                                         }
559                                 }
560
561                                 if (sendSlotsReturn.getThird()->length() != 0) {
562                                         // insert into the local block chain
563                                         validateAndUpdate(sendSlotsReturn.getThird(), true);
564                                 }
565                                 // continue;
566                         } else {
567                                 bool isInserted = false;
568                                 for (uint si = 0; si < newSlots->length(); si++) {
569                                         Slot *s = newSlots->get(si);
570                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
571                                                 isInserted = true;
572                                                 break;
573                                         }
574                                 }
575
576                                 for (uint si = 0; si < newSlots->length(); si++) {
577                                         Slot *s = newSlots->get(si);
578                                         if (isInserted) {
579                                                 break;
580                                         }
581
582                                         // Process each entry in the slot
583                                         for (Entry *entry : s->getEntries()) {
584
585                                                 if (entry->getType() == TypeLastMessage) {
586                                                         LastMessage *lastMessage = (LastMessage *)entry;
587                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
588                                                                 isInserted = true;
589                                                                 break;
590                                                         }
591                                                 }
592                                         }
593                                 }
594
595                                 if (isInserted) {
596                                         if (newKey != NULL) {
597                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
598                                                         newKey = NULL;
599                                                 }
600                                         }
601
602                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
603                                                 transaction->resetServerFailure();
604
605                                                 // Update which transactions parts still need to be sent
606                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
607
608                                                 // Add the transaction status to the outstanding list
609                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
610
611                                                 // Update the transaction status
612                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
613
614                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
615                                                 if (transaction->didSendAllParts()) {
616                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
617                                                         pendingTransactionQueue->remove(transaction);
618                                                 } else {
619                                                         transaction->resetServerFailure();
620                                                         // Set the transaction sequence number back to nothing
621                                                         if (!transaction->didSendAPartToServer()) {
622                                                                 transaction->setSequenceNumber(-1);
623                                                         }
624                                                 }
625                                         }
626                                 } else {
627                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
628                                                 transaction->resetServerFailure();
629                                                 // Set the transaction sequence number back to nothing
630                                                 if (!transaction->didSendAPartToServer()) {
631                                                         transaction->setSequenceNumber(-1);
632                                                 }
633                                         }
634                                 }
635
636                                 // insert into the local block chain
637                                 validateAndUpdate(newSlots, true);
638                         }
639                 }
640         } catch (ServerException *e) {
641                 throw e;
642         }
643
644
645
646         try {
647                 // While we have stuff that needs inserting into the block chain
648                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
649
650                         fromRetry = false;
651
652                         if (hadPartialSendToServer) {
653                                 throw new Error("Should Be error free");
654                         }
655
656
657
658                         // If there is a new key with same name then end
659                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
660                                 return false;
661                         }
662
663                         // Create the slot
664                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
665                         localSequenceNumber++;
666
667                         // Try to fill the slot with data
668                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
669                         bool needsResize = fillSlotsReturn.getFirst();
670                         int newSize = fillSlotsReturn.getSecond();
671                         bool insertedNewKey = fillSlotsReturn.getThird();
672
673                         if (needsResize) {
674                                 // Reset which transaction to send
675                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
676                                         transaction->resetNextPartToSend();
677
678                                         // Set the transaction sequence number back to nothing
679                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
680                                                 transaction->setSequenceNumber(-1);
681                                         }
682                                 }
683
684                                 // Clear the sent data since we are trying again
685                                 pendingSendArbitrationEntriesToDelete->clear();
686                                 transactionPartsSent->clear();
687
688                                 // We needed a resize so try again
689                                 fillSlot(slot, true, newKey);
690                         }
691
692                         lastSlotAttemptedToSend = slot;
693                         lastIsNewKey = (newKey != NULL);
694                         lastInsertedNewKey = insertedNewKey;
695                         lastNewSize = newSize;
696                         lastNewKey = newKey;
697                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
698                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
699
700
701                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
702
703                         if (sendSlotsReturn.getFirst()) {
704
705                                 // Did insert into the block chain
706
707                                 if (insertedNewKey) {
708                                         // This slot was what was inserted not a previous slot
709
710                                         // New Key was successfully inserted into the block chain so dont want to insert it again
711                                         newKey = NULL;
712                                 }
713
714                                 // Remove the aborts and commit parts that were sent from the pending to send queue
715                                 uint size = pendingSendArbitrationRounds->size();
716                                 uint oldcount = 0;
717                                 for (uint i=0; i < size; i++)
718                                         ArbitrationRound *round = pendingSendArbitrartionRounds->get(i);
719                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
720
721                                         if (!round->isDoneSending()) {
722                                                 // Sent all the parts
723                                                 pendingSendArbitrartionRounds->set(oldcount++,
724                                                                                                                                                                                          pendingSendArbitrartionRounds->get(i));
725                                         }
726                                 }
727                         pendingSendArbitrationRounds->setSize(oldcount);
728
729                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
730                                         transaction->resetServerFailure();
731
732                                         // Update which transactions parts still need to be sent
733                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
734
735                                         // Add the transaction status to the outstanding list
736                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
737
738                                         // Update the transaction status
739                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
740
741                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
742                                         if (transaction->didSendAllParts()) {
743                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
744                                                 pendingTransactionQueue->remove(transaction);
745                                         }
746                                 }
747                         } else {
748                                 // Reset which transaction to send
749                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
750                                         transaction->resetNextPartToSend();
751
752                                         // Set the transaction sequence number back to nothing
753                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
754                                                 transaction->setSequenceNumber(-1);
755                                         }
756                                 }
757                         }
758
759                         // Clear the sent data in preparation for next send
760                         pendingSendArbitrationEntriesToDelete->clear();
761                         transactionPartsSent->clear();
762
763                         if (sendSlotsReturn.getThird()->length() != 0) {
764                                 // insert into the local block chain
765                                 validateAndUpdate(sendSlotsReturn.getThird(), true);
766                         }
767                 }
768
769         } catch (ServerException *e) {
770
771                 if (e->getType() != ServerException->TypeInputTimeout) {
772                         // Nothing was able to be sent to the server so just clear these data structures
773                         for (Transaction *transaction : transactionPartsSent->keySet()) {
774                                 transaction->resetNextPartToSend();
775
776                                 // Set the transaction sequence number back to nothing
777                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
778                                         transaction->setSequenceNumber(-1);
779                                 }
780                         }
781                 } else {
782                         // There was a partial send to the server
783                         hadPartialSendToServer = true;
784
785                         // Nothing was able to be sent to the server so just clear these data structures
786                         for (Transaction *transaction : transactionPartsSent->keySet()) {
787                                 transaction->resetNextPartToSend();
788                                 transaction->setServerFailure();
789                         }
790                 }
791
792                 pendingSendArbitrationEntriesToDelete->clear();
793                 transactionPartsSent->clear();
794
795                 throw e;
796         }
797
798         return newKey == NULL;
799 }
800
801 bool Table::updateFromLocal(int64_t machineId) {
802         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
803         if (localCommunicationInformation == NULL) {
804                 // Cant talk to that device locally so do nothing
805                 return false;
806         }
807
808         // Get the size of the send data
809         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
810
811         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
812         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
813                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
814         }
815
816         Array<char> *sendData = new Array<char>(sendDataSize);
817         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
818
819         // Encode the data
820         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
821         bbEncode->putInt(0);
822
823         // Send by local
824         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
825         localSequenceNumber++;
826
827         if (returnData == NULL) {
828                 // Could not contact server
829                 return false;
830         }
831
832         // Decode the data
833         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
834         int numberOfEntries = bbDecode->getInt();
835
836         for (int i = 0; i < numberOfEntries; i++) {
837                 char type = bbDecode->get();
838                 if (type == TypeAbort) {
839                         Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
840                         processEntry(abort);
841                 } else if (type == TypeCommitPart) {
842                         CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
843                         processEntry(commitPart);
844                 }
845         }
846
847         updateLiveStateFromLocal();
848
849         return true;
850 }
851
852 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
853
854         // Get the devices local communications
855         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
856
857         if (localCommunicationInformation == NULL) {
858                 // Cant talk to that device locally so do nothing
859                 return Pair<bool, bool>(true, false);
860         }
861
862         // Get the size of the send data
863         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
864         for (TransactionPart *part : transaction->getParts()->values()) {
865                 sendDataSize += part->getSize();
866         }
867
868         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
869         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
870                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
871         }
872
873         // Make the send data size
874         Array<char> *sendData = new Array<char>(sendDataSize);
875         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
876
877         // Encode the data
878         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
879         bbEncode->putInt(transaction->getParts()->size());
880         for (TransactionPart *part : transaction->getParts()->values()) {
881                 part->encode(bbEncode);
882         }
883
884
885         // Send by local
886         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
887         localSequenceNumber++;
888
889         if (returnData == NULL) {
890                 // Could not contact server
891                 return Pair<bool, bool>(true, false);
892         }
893
894         // Decode the data
895         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
896         bool didCommit = bbDecode->get() == 1;
897         bool couldArbitrate = bbDecode->get() == 1;
898         int numberOfEntries = bbDecode->getInt();
899         bool foundAbort = false;
900
901         for (int i = 0; i < numberOfEntries; i++) {
902                 char type = bbDecode->get();
903                 if (type == TypeAbort) {
904                         Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
905
906                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
907                                 foundAbort = true;
908                         }
909
910                         processEntry(abort);
911                 } else if (type == TypeCommitPart) {
912                         CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
913                         processEntry(commitPart);
914                 }
915         }
916
917         updateLiveStateFromLocal();
918
919         if (couldArbitrate) {
920                 TransactionStatus status =  transaction->getTransactionStatus();
921                 if (didCommit) {
922                         status->setStatus(TransactionStatus_StatusCommitted);
923                 } else {
924                         status->setStatus(TransactionStatus_StatusAborted);
925                 }
926         } else {
927                 TransactionStatus status =  transaction->getTransactionStatus();
928                 if (foundAbort) {
929                         status->setStatus(TransactionStatus_StatusAborted);
930                 } else {
931                         status->setStatus(TransactionStatus_StatusCommitted);
932                 }
933         }
934
935         return Pair<bool, bool>(false, true);
936 }
937
938 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
939
940         // Decode the data
941         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
942         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
943         int numberOfParts = bbDecode->getInt();
944
945         // If we did commit a transaction or not
946         bool didCommit = false;
947         bool couldArbitrate = false;
948
949         if (numberOfParts != 0) {
950
951                 // decode the transaction
952                 Transaction *transaction = new Transaction();
953                 for (int i = 0; i < numberOfParts; i++) {
954                         bbDecode->get();
955                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
956                         transaction->addPartDecode(newPart);
957                 }
958
959                 // Arbitrate on transaction and pull relevant return data
960                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
961                 couldArbitrate = localArbitrateReturn.getFirst();
962                 didCommit = localArbitrateReturn.getSecond();
963
964                 updateLiveStateFromLocal();
965
966                 // Transaction was sent to the server so keep track of it to prevent double commit
967                 if (transaction->getSequenceNumber() != -1) {
968                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
969                 }
970         }
971
972         // The data to send back
973         int returnDataSize = 0;
974         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
975
976         // Get the aborts to send back
977         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
978         Collections->sort(abortLocalSequenceNumbers);
979         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
980                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
981                         continue;
982                 }
983
984                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
985                 unseenArbitrations->add(abort);
986                 returnDataSize += abort->getSize();
987         }
988
989         // Get the commits to send back
990         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
991         if (commitForClientTable != NULL) {
992                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
993                 Collections->sort(commitLocalSequenceNumbers);
994
995                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
996                         Commit *commit = commitForClientTable->get(localSequenceNumber);
997
998                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
999                                 continue;
1000                         }
1001
1002                         unseenArbitrations->addAll(commit->getParts()->values());
1003
1004                         for (CommitPart *commitPart : commit->getParts()->values()) {
1005                                 returnDataSize += commitPart->getSize();
1006                         }
1007                 }
1008         }
1009
1010         // Number of arbitration entries to decode
1011         returnDataSize += 2 * sizeof(int32_t);
1012
1013         // bool of did commit or not
1014         if (numberOfParts != 0) {
1015                 returnDataSize += sizeof(char);
1016         }
1017
1018         // Data to send Back
1019         Array<char> *returnData = new Array<char>(returnDataSize);
1020         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1021
1022         if (numberOfParts != 0) {
1023                 if (didCommit) {
1024                         bbEncode->put((char)1);
1025                 } else {
1026                         bbEncode->put((char)0);
1027                 }
1028                 if (couldArbitrate) {
1029                         bbEncode->put((char)1);
1030                 } else {
1031                         bbEncode->put((char)0);
1032                 }
1033         }
1034
1035         bbEncode->putInt(unseenArbitrations->size());
1036         uint size = unseenArbitrations->size();
1037         for(uint i = 0; i< size; i++) {
1038                 Entry * entry = unseenArbitrations->get(i);
1039                 entry->encode(bbEncode);
1040         }
1041
1042         localSequenceNumber++;
1043         return returnData;
1044 }
1045
1046 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1047         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1048         attemptedToSendToServer = true;
1049
1050         bool inserted = false;
1051         bool lastTryInserted = false;
1052
1053         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1054         if (array == NULL) {
1055                 array = new Array<Slot *>();
1056                 array->set(0, slot);
1057                 rejectedSlotVector->clear();
1058                 inserted = true;
1059         } else {
1060                 if (array->length() == 0) {
1061                         throw new Error("Server Error: Did not send any slots");
1062                 }
1063
1064                 // if (attemptedToSendToServerTmp) {
1065                 if (hadPartialSendToServer) {
1066
1067                         bool isInserted = false;
1068                         uint size = array->size();
1069                         for(uint i=0; i < size; i++) {
1070                                 Slot *s = array->get(i);
1071                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1072                                         isInserted = true;
1073                                         break;
1074                                 }
1075                         }
1076
1077                         for(uint i=0; i < size; i++) {
1078                                 Slot *s = array->get(i);
1079                                 if (isInserted) {
1080                                         break;
1081                                 }
1082
1083                                 // Process each entry in the slot
1084                                 for (Entry *entry : s->getEntries()) {
1085
1086                                         if (entry->getType() == TypeLastMessage) {
1087                                                 LastMessage *lastMessage = (LastMessage *)entry;
1088
1089                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1090                                                         isInserted = true;
1091                                                         break;
1092                                                 }
1093                                         }
1094                                 }
1095                         }
1096
1097                         if (!isInserted) {
1098                                 rejectedSlotVector->add(slot->getSequenceNumber());
1099                                 lastTryInserted = false;
1100                         } else {
1101                                 lastTryInserted = true;
1102                         }
1103                 } else {
1104                         rejectedSlotVector->add(slot->getSequenceNumber());
1105                         lastTryInserted = false;
1106                 }
1107         }
1108
1109         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1110 }
1111
1112 /**
1113  * Returns false if a resize was needed
1114  */
1115 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1116         int newSize = 0;
1117         if (liveSlotCount > bufferResizeThreshold) {
1118                 resize = true;  //Resize is forced
1119         }
1120
1121         if (resize) {
1122                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1123                 TableStatus *status = new TableStatus(slot, newSize);
1124                 slot->addEntry(status);
1125         }
1126
1127         // Fill with rejected slots first before doing anything else
1128         doRejectedMessages(slot);
1129
1130         // Do mandatory rescue of entries
1131         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1132
1133         // Extract working variables
1134         bool needsResize = mandatoryRescueReturn.getFirst();
1135         bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1136         int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1137
1138         if (needsResize && !resize) {
1139                 // We need to resize but we are not resizing so return false
1140                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1141         }
1142
1143         bool inserted = false;
1144         if (newKeyEntry != NULL) {
1145                 newKeyEntry->setSlot(slot);
1146                 if (slot->hasSpace(newKeyEntry)) {
1147                         slot->addEntry(newKeyEntry);
1148                         inserted = true;
1149                 }
1150         }
1151
1152         // Clear the transactions, aborts and commits that were sent previously
1153         transactionPartsSent->clear();
1154         pendingSendArbitrationEntriesToDelete->clear();
1155         uint size = pendingSendArbitrationRounds->size();
1156         for (uint i=0; i<size; i++)
1157                 ArbitrartionRound *round = pendingSendArbitrationRounds->get(i);
1158                 bool isFull = false;
1159                 round->generateParts();
1160                 Vector<Entry *> *parts = round->getParts();
1161
1162                 // Insert pending arbitration data
1163                 uint vsize = parts->size();
1164                 for (uint vi=0; vi<vsize; vi++) {
1165                         Entry *arbitrationData = parts->get(vi);
1166                         
1167                         // If it is an abort then we need to set some information
1168                         if (arbitrationData->getType() == TypeAbort) {
1169                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1170                         }
1171
1172                         if (!slot->hasSpace(arbitrationData)) {
1173                                 // No space so cant do anything else with these data entries
1174                                 isFull = true;
1175                                 break;
1176                         }
1177
1178                         // Add to this current slot and add it to entries to delete
1179                         slot->addEntry(arbitrationData);
1180                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1181                 }
1182
1183                 if (isFull) {
1184                         break;
1185                 }
1186         }
1187
1188         if (pendingTransactionQueue->size() > 0) {
1189                 Transaction *transaction = pendingTransactionQueue->get(0);
1190                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1191                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1192                         transaction->setSequenceNumber(slot->getSequenceNumber());
1193                 }
1194
1195                 while (true) {
1196                         TransactionPart *part = transaction->getNextPartToSend();
1197                         if (part == NULL) {
1198                                 // Ran out of parts to send for this transaction so move on
1199                                 break;
1200                         }
1201
1202                         if (slot->hasSpace(part)) {
1203                                 slot->addEntry(part);
1204                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1205                                 if (partsSent == NULL) {
1206                                         partsSent = new Vector<int32_t>();
1207                                         transactionPartsSent->put(transaction, partsSent);
1208                                 }
1209                                 partsSent->add(part->getPartNumber());
1210                                 transactionPartsSent->put(transaction, partsSent);
1211                         } else {
1212                                 break;
1213                         }
1214                 }
1215         }
1216
1217         // Fill the remainder of the slot with rescue data
1218         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1219
1220         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1221 }
1222
1223 void Table::doRejectedMessages(Slot *s) {
1224         if (!rejectedSlotVector->isEmpty()) {
1225                 /* TODO: We should avoid generating a rejected message entry if
1226                  * there is already a sufficient entry in the queue (e->g->,
1227                  * equalsto value of true and same sequence number)->  */
1228
1229                 int64_t old_seqn = rejectedSlotVector->firstElement();
1230                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1231                         int64_t new_seqn = rejectedSlotVector->lastElement();
1232                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1233                         s->addEntry(rm);
1234                 } else {
1235                         int64_t prev_seqn = -1;
1236                         int i = 0;
1237                         /* Go through list of missing messages */
1238                         for (; i < rejectedSlotVector->size(); i++) {
1239                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1240                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1241                                 if (s_msg != NULL)
1242                                         break;
1243                                 prev_seqn = curr_seqn;
1244                         }
1245                         /* Generate rejected message entry for missing messages */
1246                         if (prev_seqn != -1) {
1247                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1248                                 s->addEntry(rm);
1249                         }
1250                         /* Generate rejected message entries for present messages */
1251                         for (; i < rejectedSlotVector->size(); i++) {
1252                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1253                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1254                                 int64_t machineid = s_msg->getMachineID();
1255                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1256                                 s->addEntry(rm);
1257                         }
1258                 }
1259         }
1260 }
1261
1262 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1263         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1264         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1265         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1266                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1267         }
1268
1269         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1270         bool seenLiveSlot = false;
1271         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1272         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1273
1274
1275         // Mandatory Rescue
1276         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1277                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1278                 // Push slot number forward
1279                 if (!seenLiveSlot) {
1280                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1281                 }
1282
1283                 if (!previousSlot->isLive()) {
1284                         continue;
1285                 }
1286
1287                 // We have seen a live slot
1288                 seenLiveSlot = true;
1289
1290                 // Get all the live entries for a slot
1291                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1292
1293                 // Iterate over all the live entries and try to rescue them
1294                 for (Entry *liveEntry : liveEntries) {
1295                         if (slot->hasSpace(liveEntry)) {
1296                                 // Enough space to rescue the entry
1297                                 slot->addEntry(liveEntry);
1298                         } else if (currentSequenceNumber == firstIfFull) {
1299                                 //if there's no space but the entry is about to fall off the queue
1300                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1301                         }
1302                 }
1303         }
1304
1305         // Did not resize
1306         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1307 }
1308
1309 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1310         /* now go through live entries from least to greatest sequence number until
1311          * either all live slots added, or the slot doesn't have enough room
1312          * for SKIP_THRESHOLD consecutive entries*/
1313         int skipcount = 0;
1314         int64_t newestseqnum = buffer->getNewestSeqNum();
1315 search:
1316         for (; seqn <= newestseqnum; seqn++) {
1317                 Slot *prevslot = buffer->getSlot(seqn);
1318                 //Push slot number forward
1319                 if (!seenliveslot)
1320                         oldestLiveSlotSequenceNumver = seqn;
1321
1322                 if (!prevslot->isLive())
1323                         continue;
1324                 seenliveslot = true;
1325                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1326                 for (Entry *liveentry : liveentries) {
1327                         if (s->hasSpace(liveentry))
1328                                 s->addEntry(liveentry);
1329                         else {
1330                                 skipcount++;
1331                                 if (skipcount > Table_SKIP_THRESHOLD)
1332                                         break search;
1333                         }
1334                 }
1335         }
1336 }
1337
1338 /**
1339  * Checks for malicious activity and updates the local copy of the block chain->
1340  */
1341 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1342         // The cloud communication layer has checked slot HMACs already
1343         // before decoding
1344         if (newSlots->length() == 0) {
1345                 return;
1346         }
1347
1348         // Make sure all slots are newer than the last largest slot this
1349         // client has seen
1350         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1351         if (firstSeqNum <= sequenceNumber) {
1352                 throw new Error("Server Error: Sent older slots!");
1353         }
1354
1355         // Create an object that can access both new slots and slots in our
1356         // local chain without committing slots to our local chain
1357         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1358
1359         // Check that the HMAC chain is not broken
1360         checkHMACChain(indexer, newSlots);
1361
1362         // Set to keep track of messages from clients
1363         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1364
1365         // Process each slots data
1366         for (Slot *slot : newSlots) {
1367                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1368
1369                 updateExpectedSize();
1370         }
1371
1372         // If there is a gap, check to see if the server sent us
1373         // everything->
1374         if (firstSeqNum != (sequenceNumber + 1)) {
1375
1376                 // Check the size of the slots that were sent down by the server->
1377                 // Can only check the size if there was a gap
1378                 checkNumSlots(newSlots->length);
1379
1380                 // Since there was a gap every machine must have pushed a slot or
1381                 // must have a last message message-> If not then the server is
1382                 // hiding slots
1383                 if (!machineSet->isEmpty()) {
1384                         throw new Error("Missing record for machines: ");
1385                 }
1386         }
1387
1388         // Update the size of our local block chain->
1389         commitNewMaxSize();
1390
1391         // Commit new to slots to the local block chain->
1392         for (Slot *slot : newSlots) {
1393
1394                 // Insert this slot into our local block chain copy->
1395                 buffer->putSlot(slot);
1396
1397                 // Keep track of how many slots are currently live (have live data
1398                 // in them)->
1399                 liveSlotCount++;
1400         }
1401
1402         // Get the sequence number of the latest slot in the system
1403         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1404         updateLiveStateFromServer();
1405
1406         // No Need to remember after we pulled from the server
1407         offlineTransactionsCommittedAndAtServer->clear();
1408
1409         // This is invalidated now
1410         hadPartialSendToServer = false;
1411 }
1412
1413 void Table::updateLiveStateFromServer() {
1414         // Process the new transaction parts
1415         processNewTransactionParts();
1416
1417         // Do arbitration on new transactions that were received
1418         arbitrateFromServer();
1419
1420         // Update all the committed keys
1421         bool didCommitOrSpeculate = updateCommittedTable();
1422
1423         // Delete the transactions that are now dead
1424         updateLiveTransactionsAndStatus();
1425
1426         // Do speculations
1427         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1428         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1429 }
1430
1431 void Table::updateLiveStateFromLocal() {
1432         // Update all the committed keys
1433         bool didCommitOrSpeculate = updateCommittedTable();
1434
1435         // Delete the transactions that are now dead
1436         updateLiveTransactionsAndStatus();
1437
1438         // Do speculations
1439         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1440         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1441 }
1442
1443 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1444         int64_t prevslots = firstSequenceNumber;
1445
1446         if (didFindTableStatus) {
1447         } else {
1448                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1449         }
1450
1451         didFindTableStatus = true;
1452         currMaxSize = numberOfSlots;
1453 }
1454
1455 void Table::updateExpectedSize() {
1456         expectedsize++;
1457
1458         if (expectedsize > currMaxSize) {
1459                 expectedsize = currMaxSize;
1460         }
1461 }
1462
1463
1464 /**
1465  * Check the size of the block chain to make sure there are enough
1466  * slots sent back by the server-> This is only called when we have a
1467  * gap between the slots that we have locally and the slots sent by
1468  * the server therefore in the slots sent by the server there will be
1469  * at least 1 Table status message
1470  */
1471 void Table::checkNumSlots(int numberOfSlots) {
1472         if (numberOfSlots != expectedsize) {
1473                 throw new Error("Server Error: Server did not send all slots->  Expected: ");
1474         }
1475 }
1476
1477 void Table::updateCurrMaxSize(int newmaxsize) {
1478         currMaxSize = newmaxsize;
1479 }
1480
1481
1482 /**
1483  * Update the size of of the local buffer if it is needed->
1484  */
1485 void Table::commitNewMaxSize() {
1486         didFindTableStatus = false;
1487
1488         // Resize the local slot buffer
1489         if (numberOfSlots != currMaxSize) {
1490                 buffer->resize((int32_t)currMaxSize);
1491         }
1492
1493         // Change the number of local slots to the new size
1494         numberOfSlots = (int32_t)currMaxSize;
1495
1496         // Recalculate the resize threshold since the size of the local
1497         // buffer has changed
1498         setResizeThreshold();
1499 }
1500
1501 /**
1502  * Process the new transaction parts from this latest round of slots
1503  * received from the server
1504  */
1505 void Table::processNewTransactionParts() {
1506
1507         if (newTransactionParts->size() == 0) {
1508                 // Nothing new to process
1509                 return;
1510         }
1511
1512         // Iterate through all the machine Ids that we received new parts
1513         // for
1514         for (int64_t machineId : newTransactionParts->keySet()) {
1515                 Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1516
1517                 // Iterate through all the parts for that machine Id
1518                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1519                         TransactionPart *part = parts->get(partId);
1520
1521                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1522                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1523                                 // Set dead the transaction part
1524                                 part->setDead();
1525                                 continue;
1526                         }
1527
1528                         // Get the transaction object for that sequence number
1529                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1530
1531                         if (transaction == NULL) {
1532                                 // This is a new transaction that we dont have so make a new one
1533                                 transaction = new Transaction();
1534
1535                                 // Insert this new transaction into the live tables
1536                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1537                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1538                         }
1539
1540                         // Add that part to the transaction
1541                         transaction->addPartDecode(part);
1542                 }
1543         }
1544
1545         // Clear all the new transaction parts in preparation for the next
1546         // time the server sends slots
1547         newTransactionParts->clear();
1548 }
1549
1550 void Table::arbitrateFromServer() {
1551
1552         if (liveTransactionBySequenceNumberTable->size() == 0) {
1553                 // Nothing to arbitrate on so move on
1554                 return;
1555         }
1556
1557         // Get the transaction sequence numbers and sort from oldest to newest
1558         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1559         Collections->sort(transactionSequenceNumbers);
1560
1561         // Collection of key value pairs that are
1562         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1563
1564         // The last transaction arbitrated on
1565         int64_t lastTransactionCommitted = -1;
1566         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1567
1568         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1569                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1570
1571
1572
1573                 // Check if this machine arbitrates for this transaction if not
1574                 // then we cant arbitrate this transaction
1575                 if (transaction->getArbitrator() != localMachineId) {
1576                         continue;
1577                 }
1578
1579                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1580                         continue;
1581                 }
1582
1583                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1584                         // We have seen this already locally so dont commit again
1585                         continue;
1586                 }
1587
1588
1589                 if (!transaction->isComplete()) {
1590                         // Will arbitrate in incorrect order if we continue so just break
1591                         // Most likely this
1592                         break;
1593                 }
1594
1595
1596                 // update the largest transaction seen by arbitrator from server
1597                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1598                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1599                 } else {
1600                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1601                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1602                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1603                         }
1604                 }
1605
1606                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1607                         // Guard evaluated as true
1608
1609                         // Update the local changes so we can make the commit
1610                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1611                                 speculativeTableTmp->put(kv->getKey(), kv);
1612                         }
1613
1614                         // Update what the last transaction committed was for use in batch commit
1615                         lastTransactionCommitted = transactionSequenceNumber;
1616                 } else {
1617                         // Guard evaluated was false so create abort
1618                         // Create the abort
1619                         Abort *newAbort = new Abort(NULL,
1620                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1621                                                                                                                                         transaction->getSequenceNumber(),
1622                                                                                                                                         transaction->getMachineId(),
1623                                                                                                                                         transaction->getArbitrator(),
1624                                                                                                                                         localArbitrationSequenceNumber);
1625                         localArbitrationSequenceNumber++;
1626                         generatedAborts->add(newAbort);
1627
1628                         // Insert the abort so we can process
1629                         processEntry(newAbort);
1630                 }
1631
1632                 lastSeqNumArbOn = transactionSequenceNumber;
1633         }
1634
1635         Commit *newCommit = NULL;
1636
1637         // If there is something to commit
1638         if (speculativeTableTmp->size() != 0) {
1639                 // Create the commit and increment the commit sequence number
1640                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1641                 localArbitrationSequenceNumber++;
1642
1643                 // Add all the new keys to the commit
1644                 for (KeyValue *kv : speculativeTableTmp->values()) {
1645                         newCommit->addKV(kv);
1646                 }
1647
1648                 // create the commit parts
1649                 newCommit->createCommitParts();
1650
1651                 // Append all the commit parts to the end of the pending queue
1652                 // waiting for sending to the server
1653                 // Insert the commit so we can process it
1654                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1655                         processEntry(commitPart);
1656                 }
1657         }
1658
1659         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1660                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1661                 pendingSendArbitrationRounds->add(arbitrationRound);
1662
1663                 if (compactArbitrationData()) {
1664                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1665                         if (newArbitrationRound->getCommit() != NULL) {
1666                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1667                                         processEntry(commitPart);
1668                                 }
1669                         }
1670                 }
1671         }
1672 }
1673
1674 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1675
1676         // Check if this machine arbitrates for this transaction if not then
1677         // we cant arbitrate this transaction
1678         if (transaction->getArbitrator() != localMachineId) {
1679                 return Pair<bool, bool>(false, false);
1680         }
1681
1682         if (!transaction->isComplete()) {
1683                 // Will arbitrate in incorrect order if we continue so just break
1684                 // Most likely this
1685                 return Pair<bool, bool>(false, false);
1686         }
1687
1688         if (transaction->getMachineId() != localMachineId) {
1689                 // dont do this check for local transactions
1690                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1691                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1692                                 // We've have already seen this from the server
1693                                 return Pair<bool, bool>(false, false);
1694                         }
1695                 }
1696         }
1697
1698         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1699                 // Guard evaluated as true Create the commit and increment the
1700                 // commit sequence number
1701                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1702                 localArbitrationSequenceNumber++;
1703
1704                 // Update the local changes so we can make the commit
1705                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1706                         newCommit->addKV(kv);
1707                 }
1708
1709                 // create the commit parts
1710                 newCommit->createCommitParts();
1711
1712                 // Append all the commit parts to the end of the pending queue
1713                 // waiting for sending to the server
1714                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1715                 pendingSendArbitrationRounds->add(arbitrationRound);
1716
1717                 if (compactArbitrationData()) {
1718                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1719                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1720                                 processEntry(commitPart);
1721                         }
1722                 } else {
1723                         // Insert the commit so we can process it
1724                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1725                                 processEntry(commitPart);
1726                         }
1727                 }
1728
1729                 if (transaction->getMachineId() == localMachineId) {
1730                         TransactionStatus *status = transaction->getTransactionStatus();
1731                         if (status != NULL) {
1732                                 status->setStatus(TransactionStatus_StatusCommitted);
1733                         }
1734                 }
1735
1736                 updateLiveStateFromLocal();
1737                 return Pair<bool, bool>(true, true);
1738         } else {
1739                 if (transaction->getMachineId() == localMachineId) {
1740                         // For locally created messages update the status
1741                         // Guard evaluated was false so create abort
1742                         TransactionStatus status = transaction->getTransactionStatus();
1743                         if (status != NULL) {
1744                                 status->setStatus(TransactionStatus_StatusAborted);
1745                         }
1746                 } else {
1747                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1748
1749                         // Create the abort
1750                         Abort *newAbort = new Abort(NULL,
1751                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1752                                                                                                                                         -1,
1753                                                                                                                                         transaction->getMachineId(),
1754                                                                                                                                         transaction->getArbitrator(),
1755                                                                                                                                         localArbitrationSequenceNumber);
1756                         localArbitrationSequenceNumber++;
1757                         addAbortSet->add(newAbort);
1758
1759                         // Append all the commit parts to the end of the pending queue
1760                         // waiting for sending to the server
1761                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1762                         pendingSendArbitrationRounds->add(arbitrationRound);
1763
1764                         if (compactArbitrationData()) {
1765                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1766                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1767                                         processEntry(commitPart);
1768                                 }
1769                         }
1770                 }
1771
1772                 updateLiveStateFromLocal();
1773                 return Pair<bool, bool>(true, false);
1774         }
1775 }
1776
1777 /**
1778  * Compacts the arbitration data my merging commits and aggregating
1779  * aborts so that a single large push of commits can be done instead
1780  * of many small updates
1781  */
1782 bool Table::compactArbitrationData() {
1783         if (pendingSendArbitrationRounds->size() < 2) {
1784                 // Nothing to compact so do nothing
1785                 return false;
1786         }
1787
1788         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1789         if (lastRound->didSendPart()) {
1790                 return false;
1791         }
1792
1793         bool hadCommit = (lastRound->getCommit() == NULL);
1794         bool gotNewCommit = false;
1795
1796         int numberToDelete = 1;
1797         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1798                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1799
1800                 if (round->isFull() || round->didSendPart()) {
1801                         // Stop since there is a part that cannot be compacted and we
1802                         // need to compact in order
1803                         break;
1804                 }
1805
1806                 if (round->getCommit() == NULL) {
1807                         // Try compacting aborts only
1808                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1809                         if (newSize > ArbitrationRound->MAX_PARTS) {
1810                                 // Cant compact since it would be too large
1811                                 break;
1812                         }
1813                         lastRound->addAborts(round->getAborts());
1814                 } else {
1815                         // Create a new larger commit
1816                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1817                         localArbitrationSequenceNumber++;
1818
1819                         // Create the commit parts so that we can count them
1820                         newCommit->createCommitParts();
1821
1822                         // Calculate the new size of the parts
1823                         int newSize = newCommit->getNumberOfParts();
1824                         newSize += lastRound->getAbortsCount();
1825                         newSize += round->getAbortsCount();
1826
1827                         if (newSize > ArbitrationRound->MAX_PARTS) {
1828                                 // Cant compact since it would be too large
1829                                 break;
1830                         }
1831
1832                         // Set the new compacted part
1833                         lastRound->setCommit(newCommit);
1834                         lastRound->addAborts(round->getAborts());
1835                         gotNewCommit = true;
1836                 }
1837
1838                 numberToDelete++;
1839         }
1840
1841         if (numberToDelete != 1) {
1842                 // If there is a compaction
1843                 // Delete the previous pieces that are now in the new compacted piece
1844                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1845                         pendingSendArbitrationRounds->clear();
1846                 } else {
1847                         for (int i = 0; i < numberToDelete; i++) {
1848                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1849                         }
1850                 }
1851
1852                 // Add the new compacted into the pending to send list
1853                 pendingSendArbitrationRounds->add(lastRound);
1854
1855                 // Should reinsert into the commit processor
1856                 if (hadCommit && gotNewCommit) {
1857                         return true;
1858                 }
1859         }
1860
1861         return false;
1862 }
1863
1864 /**
1865  * Update all the commits and the committed tables, sets dead the dead
1866  * transactions
1867  */
1868 bool Table::updateCommittedTable() {
1869
1870         if (newCommitParts->size() == 0) {
1871                 // Nothing new to process
1872                 return false;
1873         }
1874
1875         // Iterate through all the machine Ids that we received new parts for
1876         for (int64_t machineId : newCommitParts->keySet()) {
1877                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1878
1879                 // Iterate through all the parts for that machine Id
1880                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1881                         CommitPart *part = parts->get(partId);
1882
1883                         // Get the transaction object for that sequence number
1884                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1885
1886                         if (commitForClientTable == NULL) {
1887                                 // This is the first commit from this device
1888                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1889                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1890                         }
1891
1892                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1893
1894                         if (commit == NULL) {
1895                                 // This is a new commit that we dont have so make a new one
1896                                 commit = new Commit();
1897
1898                                 // Insert this new commit into the live tables
1899                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1900                         }
1901
1902                         // Add that part to the commit
1903                         commit->addPartDecode(part);
1904                 }
1905         }
1906
1907         // Clear all the new commits parts in preparation for the next time
1908         // the server sends slots
1909         newCommitParts->clear();
1910
1911         // If we process a new commit keep track of it for future use
1912         bool didProcessANewCommit = false;
1913
1914         // Process the commits one by one
1915         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1916
1917                 // Get all the commits for a specific arbitrator
1918                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1919
1920                 // Sort the commits in order
1921                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1922                 Collections->sort(commitSequenceNumbers);
1923
1924                 // Get the last commit seen from this arbitrator
1925                 int64_t lastCommitSeenSequenceNumber = -1;
1926                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1927                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1928                 }
1929
1930                 // Go through each new commit one by one
1931                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1932                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1933                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1934
1935                         // Special processing if a commit is not complete
1936                         if (!commit->isComplete()) {
1937                                 if (i == (commitSequenceNumbers->size() - 1)) {
1938                                         // If there is an incomplete commit and this commit is the
1939                                         // latest one seen then this commit cannot be processed and
1940                                         // there are no other commits
1941                                         break;
1942                                 } else {
1943                                         // This is a commit that was already dead but parts of it
1944                                         // are still in the block chain (not flushed out yet)->
1945                                         // Delete it and move on
1946                                         commit->setDead();
1947                                         commitForClientTable->remove(commit->getSequenceNumber());
1948                                         continue;
1949                                 }
1950                         }
1951
1952                         // Update the last transaction that was updated if we can
1953                         if (commit->getTransactionSequenceNumber() != -1) {
1954                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1955
1956                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1957                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1958                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1959                                 }
1960                         }
1961
1962                         // Update the last arbitration data that we have seen so far
1963                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1964
1965                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1966                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1967                                         // Is larger
1968                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1969                                 }
1970                         } else {
1971                                 // Never seen any data from this arbitrator so record the first one
1972                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1973                         }
1974
1975                         // We have already seen this commit before so need to do the
1976                         // full processing on this commit
1977                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1978
1979                                 // Update the last transaction that was updated if we can
1980                                 if (commit->getTransactionSequenceNumber() != -1) {
1981                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1982
1983                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1984                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1985                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1986                                         }
1987                                 }
1988
1989                                 continue;
1990                         }
1991
1992                         // If we got here then this is a brand new commit and needs full
1993                         // processing
1994                         // Get what commits should be edited, these are the commits that
1995                         // have live values for their keys
1996                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1997                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1998                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1999                         }
2000                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
2001
2002                         // Update each previous commit that needs to be updated
2003                         for (Commit *previousCommit : commitsToEdit) {
2004
2005                                 // Only bother with live commits (TODO: Maybe remove this check)
2006                                 if (previousCommit->isLive()) {
2007
2008                                         // Update which keys in the old commits are still live
2009                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2010                                                 previousCommit->invalidateKey(kv->getKey());
2011                                         }
2012
2013                                         // if the commit is now dead then remove it
2014                                         if (!previousCommit->isLive()) {
2015                                                 commitForClientTable->remove(previousCommit);
2016                                         }
2017                                 }
2018                         }
2019
2020                         // Update the last seen sequence number from this arbitrator
2021                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2022                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2023                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2024                                 }
2025                         } else {
2026                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2027                         }
2028
2029                         // We processed a new commit that we havent seen before
2030                         didProcessANewCommit = true;
2031
2032                         // Update the committed table of keys and which commit is using which key
2033                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2034                                 committedKeyValueTable->put(kv->getKey(), kv);
2035                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2036                         }
2037                 }
2038         }
2039
2040         return didProcessANewCommit;
2041 }
2042
2043 /**
2044  * Create the speculative table from transactions that are still live
2045  * and have come from the cloud
2046  */
2047 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2048         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2049                 // There is nothing to speculate on
2050                 return false;
2051         }
2052
2053         // Create a list of the transaction sequence numbers and sort them
2054         // from oldest to newest
2055         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2056         Collections->sort(transactionSequenceNumbersSorted);
2057
2058         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2059
2060
2061         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2062                 // If there is a gap in the transaction sequence numbers then
2063                 // there was a commit or an abort of a transaction OR there was a
2064                 // new commit (Could be from offline commit) so a redo the
2065                 // speculation from scratch
2066
2067                 // Start from scratch
2068                 speculatedKeyValueTable->clear();
2069                 lastTransactionSequenceNumberSpeculatedOn = -1;
2070                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2071
2072         }
2073
2074         // Remember the front of the transaction list
2075         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2076
2077         // Find where to start arbitration from
2078         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2079
2080         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2081                 // Make sure we are not out of bounds
2082                 return false;           // did not speculate
2083         }
2084
2085         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2086         bool didSkip = true;
2087
2088         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2089                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2090                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2091
2092                 if (!transaction->isComplete()) {
2093                         // If there is an incomplete transaction then there is nothing
2094                         // we can do add this transactions arbitrator to the list of
2095                         // arbitrators we should ignore
2096                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2097                         didSkip = true;
2098                         continue;
2099                 }
2100
2101                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2102                         continue;
2103                 }
2104
2105                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2106
2107                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2108                         // Guard evaluated to true so update the speculative table
2109                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2110                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2111                         }
2112                 }
2113         }
2114
2115         if (didSkip) {
2116                 // Since there was a skip we need to redo the speculation next time around
2117                 lastTransactionSequenceNumberSpeculatedOn = -1;
2118                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2119         }
2120
2121         // We did some speculation
2122         return true;
2123 }
2124
2125 /**
2126  * Create the pending transaction speculative table from transactions
2127  * that are still in the pending transaction buffer
2128  */
2129 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2130         if (pendingTransactionQueue->size() == 0) {
2131                 // There is nothing to speculate on
2132                 return;
2133         }
2134
2135         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2136                 // need to reset on the pending speculation
2137                 lastPendingTransactionSpeculatedOn = NULL;
2138                 firstPendingTransaction = pendingTransactionQueue->get(0);
2139                 pendingTransactionSpeculatedKeyValueTable->clear();
2140         }
2141
2142         // Find where to start arbitration from
2143         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2144
2145         if (startIndex >= pendingTransactionQueue->size()) {
2146                 // Make sure we are not out of bounds
2147                 return;
2148         }
2149
2150         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2151                 Transaction *transaction = pendingTransactionQueue->get(i);
2152
2153                 lastPendingTransactionSpeculatedOn = transaction;
2154
2155                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2156                         // Guard evaluated to true so update the speculative table
2157                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2158                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2159                         }
2160                 }
2161         }
2162 }
2163
2164 /**
2165  * Set dead and remove from the live transaction tables the
2166  * transactions that are dead
2167  */
2168 void Table::updateLiveTransactionsAndStatus() {
2169
2170         // Go through each of the transactions
2171         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2172                 Transaction *transaction = iter->next()->getValue();
2173
2174                 // Check if the transaction is dead
2175                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2176                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2177
2178                         // Set dead the transaction
2179                         transaction->setDead();
2180
2181                         // Remove the transaction from the live table
2182                         iter->remove();
2183                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2184                 }
2185         }
2186
2187         // Go through each of the transactions
2188         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2189                 TransactionStatus *status = iter->next()->getValue();
2190
2191                 // Check if the transaction is dead
2192                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2193                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2194
2195                         // Set committed
2196                         status->setStatus(TransactionStatus_StatusCommitted);
2197
2198                         // Remove
2199                         iter->remove();
2200                 }
2201         }
2202 }
2203
2204 /**
2205  * Process this slot, entry by entry->  Also update the latest message sent by slot
2206  */
2207 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2208
2209         // Update the last message seen
2210         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2211
2212         // Process each entry in the slot
2213         for (Entry *entry : slot->getEntries()) {
2214                 switch (entry->getType()) {
2215                 case TypeCommitPart:
2216                         processEntry((CommitPart *)entry);
2217                         break;
2218                 case TypeAbort:
2219                         processEntry((Abort *)entry);
2220                         break;
2221                 case TypeTransactionPart:
2222                         processEntry((TransactionPart *)entry);
2223                         break;
2224                 case TypeNewKey:
2225                         processEntry((NewKey *)entry);
2226                         break;
2227                 case TypeLastMessage:
2228                         processEntry((LastMessage *)entry, machineSet);
2229                         break;
2230                 case TypeRejectedMessage:
2231                         processEntry((RejectedMessage *)entry, indexer);
2232                         break;
2233                 case TypeTableStatus:
2234                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2235                         break;
2236                 default:
2237                         throw new Error("Unrecognized type: ");
2238                 }
2239         }
2240 }
2241
2242 /**
2243  * Update the last message that was sent for a machine Id
2244  */
2245 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2246         // Update what the last message received by a machine was
2247         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2248 }
2249
2250 /**
2251  * Add the new key to the arbitrators table and update the set of live
2252  * new keys (in case of a rescued new key message)
2253  */
2254 void Table::processEntry(NewKey *entry) {
2255         // Update the arbitrator table with the new key information
2256         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2257
2258         // Update what the latest live new key is
2259         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2260         if (oldNewKey != NULL) {
2261                 // Delete the old new key messages
2262                 oldNewKey->setDead();
2263         }
2264 }
2265
2266 /**
2267  * Process new table status entries and set dead the old ones as new
2268  * ones come in-> keeps track of the largest and smallest table status
2269  * seen in this current round of updating the local copy of the block
2270  * chain
2271  */
2272 void Table::processEntry(TableStatus entry, int64_t seq) {
2273         int newNumSlots = entry->getMaxSlots();
2274         updateCurrMaxSize(newNumSlots);
2275         initExpectedSize(seq, newNumSlots);
2276
2277         if (liveTableStatus != NULL) {
2278                 // We have a larger table status so the old table status is no
2279                 // int64_ter alive
2280                 liveTableStatus->setDead();
2281         }
2282
2283         // Make this new table status the latest alive table status
2284         liveTableStatus = entry;
2285 }
2286
2287 /**
2288  * Check old messages to see if there is a block chain violation->
2289  * Also
2290  */
2291 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2292         int64_t oldSeqNum = entry->getOldSeqNum();
2293         int64_t newSeqNum = entry->getNewSeqNum();
2294         bool isequal = entry->getEqual();
2295         int64_t machineId = entry->getMachineID();
2296         int64_t seq = entry->getSequenceNumber();
2297
2298         // Check if we have messages that were supposed to be rejected in
2299         // our local block chain
2300         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2301                 // Get the slot
2302                 Slot *slot = indexer->getSlot(seqNum);
2303
2304                 if (slot != NULL) {
2305                         // If we have this slot make sure that it was not supposed to be
2306                         // a rejected slot
2307                         int64_t slotMachineId = slot->getMachineID();
2308                         if (isequal != (slotMachineId == machineId)) {
2309                                 throw new Error("Server Error: Trying to insert rejected message for slot ");
2310                         }
2311                 }
2312         }
2313
2314         // Create a list of clients to watch until they see this rejected
2315         // message entry->
2316         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2317         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2318                 // Machine ID for the last message entry
2319                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2320
2321                 // We've seen it, don't need to continue to watch->  Our next
2322                 // message will implicitly acknowledge it->
2323                 if (lastMessageEntryMachineId == localMachineId) {
2324                         continue;
2325                 }
2326
2327                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2328                 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2329
2330                 if (entrySequenceNumber < seq) {
2331                         // Add this rejected message to the set of messages that this
2332                         // machine ID did not see yet
2333                         addWatchVector(lastMessageEntryMachineId, entry);
2334                         // This client did not see this rejected message yet so add it
2335                         // to the watch set to monitor
2336                         deviceWatchSet->add(lastMessageEntryMachineId);
2337                 }
2338         }
2339         if (deviceWatchSet->isEmpty()) {
2340                 // This rejected message has been seen by all the clients so
2341                 entry->setDead();
2342         } else {
2343                 // We need to watch this rejected message
2344                 entry->setWatchSet(deviceWatchSet);
2345         }
2346 }
2347
2348 /**
2349  * Check if this abort is live, if not then save it so we can kill it
2350  * later-> update the last transaction number that was arbitrated on->
2351  */
2352 void Table::processEntry(Abort *entry) {
2353         if (entry->getTransactionSequenceNumber() != -1) {
2354                 // update the transaction status if it was sent to the server
2355                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2356                 if (status != NULL) {
2357                         status->setStatus(TransactionStatus_StatusAborted);
2358                 }
2359         }
2360
2361         // Abort has not been seen by the client it is for yet so we need to
2362         // keep track of it
2363         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2364         if (previouslySeenAbort != NULL) {
2365                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2366         }
2367
2368         if (entry->getTransactionArbitrator() == localMachineId) {
2369                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2370         }
2371
2372         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2373                 // The machine already saw this so it is dead
2374                 entry->setDead();
2375                 liveAbortTable->remove(entry->getAbortId());
2376
2377                 if (entry->getTransactionArbitrator() == localMachineId) {
2378                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2379                 }
2380                 return;
2381         }
2382
2383         // Update the last arbitration data that we have seen so far
2384         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2385                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2386                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2387                         // Is larger
2388                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2389                 }
2390         } else {
2391                 // Never seen any data from this arbitrator so record the first one
2392                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2393         }
2394
2395         // Set dead a transaction if we can
2396         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2397         if (transactionToSetDead != NULL) {
2398                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2399         }
2400
2401         // Update the last transaction sequence number that the arbitrator
2402         // arbitrated on
2403         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2404         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2405                 // Is a valid one
2406                 if (entry->getTransactionSequenceNumber() != -1) {
2407                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2408                 }
2409         }
2410 }
2411
2412 /**
2413  * Set dead the transaction part if that transaction is dead and keep
2414  * track of all new parts
2415  */
2416 void Table::processEntry(TransactionPart *entry) {
2417         // Check if we have already seen this transaction and set it dead OR
2418         // if it is not alive
2419         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2420         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2421                 // This transaction is dead, it was already committed or aborted
2422                 entry->setDead();
2423                 return;
2424         }
2425
2426         // This part is still alive
2427         Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2428
2429         if (transactionPart == NULL) {
2430                 // Dont have a table for this machine Id yet so make one
2431                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2432                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2433         }
2434
2435         // Update the part and set dead ones we have already seen (got a
2436         // rescued version)
2437         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2438         if (previouslySeenPart != NULL) {
2439                 previouslySeenPart->setDead();
2440         }
2441 }
2442
2443 /**
2444  * Process new commit entries and save them for future use->  Delete duplicates
2445  */
2446 void Table::processEntry(CommitPart *entry) {
2447         // Update the last transaction that was updated if we can
2448         if (entry->getTransactionSequenceNumber() != -1) {
2449                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2450                 // Update the last transaction sequence number that the arbitrator
2451                 // arbitrated on
2452                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2453                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2454                 }
2455         }
2456
2457         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2458         if (commitPart == NULL) {
2459                 // Don't have a table for this machine Id yet so make one
2460                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2461                 newCommitParts->put(entry->getMachineId(), commitPart);
2462         }
2463         // Update the part and set dead ones we have already seen (got a
2464         // rescued version)
2465         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2466         if (previouslySeenPart != NULL) {
2467                 previouslySeenPart->setDead();
2468         }
2469 }
2470
2471 /**
2472  * Update the last message seen table-> Update and set dead the
2473  * appropriate RejectedMessages as clients see them-> Updates the live
2474  * aborts, removes those that are dead and sets them dead-> Check that
2475  * the last message seen is correct and that there is no mismatch of
2476  * our own last message or that other clients have not had a rollback
2477  * on the last message->
2478  */
2479 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2480         // We have seen this machine ID
2481         machineSet->remove(machineId);
2482
2483         // Get the set of rejected messages that this machine Id is has not seen yet
2484         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2485         // If there is a rejected message that this machine Id has not seen yet
2486         if (watchset != NULL) {
2487                 // Go through each rejected message that this machine Id has not
2488                 // seen yet
2489                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2490                         RejectedMessage *rm = rmit->next();
2491                         // If this machine Id has seen this rejected message->->->
2492                         if (rm->getSequenceNumber() <= seqNum) {
2493                                 // Remove it from our watchlist
2494                                 rmit->remove();
2495                                 // Decrement machines that need to see this notification
2496                                 rm->removeWatcher(machineId);
2497                         }
2498                 }
2499         }
2500
2501         // Set dead the abort
2502         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2503                 Abort *abort = i->next()->getValue();
2504                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2505                         abort->setDead();
2506                         i->remove();
2507                         if (abort->getTransactionArbitrator() == localMachineId) {
2508                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2509                         }
2510                 }
2511         }
2512         if (machineId == localMachineId) {
2513                 // Our own messages are immediately dead->
2514                 if (liveness instanceof LastMessage) {
2515                         ((LastMessage *)liveness)->setDead();
2516                 } else if (liveness instanceof Slot) {
2517                         ((Slot *)liveness)->setDead();
2518                 } else {
2519                         throw new Error("Unrecognized type");
2520                 }
2521         }
2522         // Get the old last message for this device
2523         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2524         if (lastMessageEntry == NULL) {
2525                 // If no last message then there is nothing else to process
2526                 return;
2527         }
2528
2529         int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2530         Liveness *lastEntry = lastMessageEntry.getSecond();
2531
2532         // If it is not our machine Id since we already set ours to dead
2533         if (machineId != localMachineId) {
2534                 if (lastEntry instanceof LastMessage) {
2535                         ((LastMessage *)lastEntry)->setDead();
2536                 } else if (lastEntry instanceof Slot) {
2537                         ((Slot *)lastEntry)->setDead();
2538                 } else {
2539                         throw new Error("Unrecognized type");
2540                 }
2541         }
2542         // Make sure the server is not playing any games
2543         if (machineId == localMachineId) {
2544                 if (hadPartialSendToServer) {
2545                         // We were not making any updates and we had a machine mismatch
2546                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2547                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2548                         }
2549                 } else {
2550                         // We were not making any updates and we had a machine mismatch
2551                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2552                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2553                         }
2554                 }
2555         } else {
2556                 if (lastMessageSeqNum > seqNum) {
2557                         throw new Error("Server Error: Rollback on remote machine sequence number");
2558                 }
2559         }
2560 }
2561
2562 /**
2563  * Add a rejected message entry to the watch set to keep track of
2564  * which clients have seen that rejected message entry and which have
2565  * not.
2566  */
2567 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2568         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2569         if (entries == NULL) {
2570                 // There is no set for this machine ID yet so create one
2571                 entries = new Hashset<RejectedMessage *>();
2572                 rejectedMessageWatchVectorTable->put(machineId, entries);
2573         }
2574         entries->add(entry);
2575 }
2576
2577 /**
2578  * Check if the HMAC chain is not violated
2579  */
2580 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2581         for (int i = 0; i < newSlots->length(); i++) {
2582                 Slot *currSlot = newSlots->get(i);
2583                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2584                 if (prevSlot != NULL &&
2585                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2586                         throw new Error("Server Error: Invalid HMAC Chain");
2587         }
2588 }