5e69e130c1a106cb19ee0e72669d9ea72d4cb91d
[iotcloud.git] / version2 / src / C / Table.cc
1 #include "Table.h"
2 #include "CloudComm.h"
3 #include "SlotBuffer.h"
4 #include "NewKey.h"
5 #include "Slot.h"
6 #include "KeyValue.h"
7 #include "Error.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "Random.h"
14
15
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
17         buffer(NULL),
18         cloud(new CloudComm(this, baseurl, password, listeningPort)),
19         random(NULL),
20         liveTableStatus(NULL),
21         pendingTransactionBuilder(NULL),
22         lastPendingTransactionSpeculatedOn(NULL),
23         firstPendingTransaction(NULL),
24         numberOfSlots(0),
25         bufferResizeThreshold(0),
26         liveSlotCount(0),
27         oldestLiveSlotSequenceNumver(1),
28         localMachineId(_localMachineId),
29         sequenceNumber(0),
30         localTransactionSequenceNumber(0),
31         lastTransactionSequenceNumberSpeculatedOn(0),
32         oldestTransactionSequenceNumberSpeculatedOn(0),
33         localArbitrationSequenceNumber(0),
34         hadPartialSendToServer(false),
35         attemptedToSendToServer(false),
36         expectedsize(0),
37         didFindTableStatus(false),
38         currMaxSize(0),
39         lastSlotAttemptedToSend(NULL),
40         lastIsNewKey(false),
41         lastNewSize(0),
42         lastTransactionPartsSent(NULL),
43         lastPendingSendArbitrationEntriesToDelete(NULL),
44         lastNewKey(NULL),
45         committedKeyValueTable(NULL),
46         speculatedKeyValueTable(NULL),
47         pendingTransactionSpeculatedKeyValueTable(NULL),
48         liveNewKeyTable(NULL),
49         lastMessageTable(NULL),
50         rejectedMessageWatchVectorTable(NULL),
51         arbitratorTable(NULL),
52         liveAbortTable(NULL),
53         newTransactionParts(NULL),
54         newCommitParts(NULL),
55         lastArbitratedTransactionNumberByArbitratorTable(NULL),
56         liveTransactionBySequenceNumberTable(NULL),
57         liveTransactionByTransactionIdTable(NULL),
58         liveCommitsTable(NULL),
59         liveCommitsByKeyTable(NULL),
60         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61         rejectedSlotVector(NULL),
62         pendingTransactionQueue(NULL),
63         pendingSendArbitrationRounds(NULL),
64         pendingSendArbitrationEntriesToDelete(NULL),
65         transactionPartsSent(NULL),
66         outstandingTransactionStatus(NULL),
67         liveAbortsGeneratedByLocal(NULL),
68         offlineTransactionsCommittedAndAtServer(NULL),
69         localCommunicationTable(NULL),
70         lastTransactionSeenFromMachineFromServer(NULL),
71         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72         lastInsertedNewKey(false),
73         lastSeqNumArbOn(0)
74 {
75         init();
76 }
77
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
79         buffer(NULL),
80         cloud(_cloud),
81         random(NULL),
82         liveTableStatus(NULL),
83         pendingTransactionBuilder(NULL),
84         lastPendingTransactionSpeculatedOn(NULL),
85         firstPendingTransaction(NULL),
86         numberOfSlots(0),
87         bufferResizeThreshold(0),
88         liveSlotCount(0),
89         oldestLiveSlotSequenceNumver(1),
90         localMachineId(_localMachineId),
91         sequenceNumber(0),
92         localTransactionSequenceNumber(0),
93         lastTransactionSequenceNumberSpeculatedOn(0),
94         oldestTransactionSequenceNumberSpeculatedOn(0),
95         localArbitrationSequenceNumber(0),
96         hadPartialSendToServer(false),
97         attemptedToSendToServer(false),
98         expectedsize(0),
99         didFindTableStatus(false),
100         currMaxSize(0),
101         lastSlotAttemptedToSend(NULL),
102         lastIsNewKey(false),
103         lastNewSize(0),
104         lastTransactionPartsSent(NULL),
105         lastPendingSendArbitrationEntriesToDelete(NULL),
106         lastNewKey(NULL),
107         committedKeyValueTable(NULL),
108         speculatedKeyValueTable(NULL),
109         pendingTransactionSpeculatedKeyValueTable(NULL),
110         liveNewKeyTable(NULL),
111         lastMessageTable(NULL),
112         rejectedMessageWatchVectorTable(NULL),
113         arbitratorTable(NULL),
114         liveAbortTable(NULL),
115         newTransactionParts(NULL),
116         newCommitParts(NULL),
117         lastArbitratedTransactionNumberByArbitratorTable(NULL),
118         liveTransactionBySequenceNumberTable(NULL),
119         liveTransactionByTransactionIdTable(NULL),
120         liveCommitsTable(NULL),
121         liveCommitsByKeyTable(NULL),
122         lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123         rejectedSlotVector(NULL),
124         pendingTransactionQueue(NULL),
125         pendingSendArbitrationRounds(NULL),
126         pendingSendArbitrationEntriesToDelete(NULL),
127         transactionPartsSent(NULL),
128         outstandingTransactionStatus(NULL),
129         liveAbortsGeneratedByLocal(NULL),
130         offlineTransactionsCommittedAndAtServer(NULL),
131         localCommunicationTable(NULL),
132         lastTransactionSeenFromMachineFromServer(NULL),
133         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134         lastInsertedNewKey(false),
135         lastSeqNumArbOn(0)
136 {
137         init();
138 }
139
140 /**
141  * Init all the stuff needed for for table usage
142  */
143 void Table::init() {
144         // Init helper objects
145         random = new Random();
146         buffer = new SlotBuffer();
147
148         // init data structs
149         committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150         speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151         pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152         liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153         lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154         rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155         arbitratorTable = new Hashtable<IoTString *, int64_t>();
156         liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157         newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158         newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159         lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160         liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161         liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162         liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163         liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164         lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165         rejectedSlotVector = new Vector<int64_t>();
166         pendingTransactionQueue = new Vector<Transaction *>();
167         pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168         transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169         outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170         liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171         offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172         localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173         lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174         pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
176
177
178         // Other init stuff
179         numberOfSlots = buffer->capacity();
180         setResizeThreshold();
181 }
182
183 /**
184  * Initialize the table by inserting a table status as the first entry
185  * into the table status also initialize the crypto stuff.
186  */
187 void Table::initTable() {
188         cloud->initSecurity();
189
190         // Create the first insertion into the block chain which is the table status
191         Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192         localSequenceNumber++;
193         TableStatus *status = new TableStatus(s, numberOfSlots);
194         s->addEntry(status);
195         Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
196
197         if (array == NULL) {
198                 array = new Array<Slot *>(1);
199                 array->set(0, s);
200                 // update local block chain
201                 validateAndUpdate(array, true);
202         } else if (array->length() == 1) {
203                 // in case we did push the slot BUT we failed to init it
204                 validateAndUpdate(array, true);
205         } else {
206                 throw new Error("Error on initialization");
207         }
208 }
209
210 /**
211  * Rebuild the table from scratch by pulling the latest block chain
212  * from the server.
213  */
214 void Table::rebuild() {
215         // Just pull the latest slots from the server
216         Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217         validateAndUpdate(newslots, true);
218         sendToServer(NULL);
219         updateLiveTransactionsAndStatus();
220 }
221
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223         localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
224 }
225
226 int64_t Table::getArbitrator(IoTString *key) {
227         return arbitratorTable->get(key);
228 }
229
230 void Table::close() {
231         cloud->close();
232 }
233
234 IoTString *Table::getCommitted(IoTString *key)  {
235         KeyValue *kv = committedKeyValueTable->get(key);
236
237         if (kv != NULL) {
238                 return kv->getValue();
239         } else {
240                 return NULL;
241         }
242 }
243
244 IoTString *Table::getSpeculative(IoTString *key) {
245         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
246
247         if (kv == NULL) {
248                 kv = speculatedKeyValueTable->get(key);
249         }
250
251         if (kv == NULL) {
252                 kv = committedKeyValueTable->get(key);
253         }
254
255         if (kv != NULL) {
256                 return kv->getValue();
257         } else {
258                 return NULL;
259         }
260 }
261
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263         KeyValue *kv = committedKeyValueTable->get(key);
264
265         if (!arbitratorTable->contains(key)) {
266                 throw new Error("Key not Found.");
267         }
268
269         // Make sure new key value pair matches the current arbitrator
270         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271                 // TODO: Maybe not throw en error
272                 throw new Error("Not all Key Values Match Arbitrator.");
273         }
274
275         if (kv != NULL) {
276                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277                 return kv->getValue();
278         } else {
279                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
280                 return NULL;
281         }
282 }
283
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285         if (!arbitratorTable->contains(key)) {
286                 throw new Error("Key not Found.");
287         }
288
289         // Make sure new key value pair matches the current arbitrator
290         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291                 // TODO: Maybe not throw en error
292                 throw new Error("Not all Key Values Match Arbitrator.");
293         }
294
295         KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
296
297         if (kv == NULL) {
298                 kv = speculatedKeyValueTable->get(key);
299         }
300
301         if (kv == NULL) {
302                 kv = committedKeyValueTable->get(key);
303         }
304
305         if (kv != NULL) {
306                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307                 return kv->getValue();
308         } else {
309                 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
310                 return NULL;
311         }
312 }
313
314 bool Table::update()  {
315         try {
316                 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317                 validateAndUpdate(newSlots, false);
318                 sendToServer(NULL);
319                 updateLiveTransactionsAndStatus();
320                 return true;
321         } catch (Exception *e) {
322                 for (int64_t m : localCommunicationTable->keySet()) {
323                         updateFromLocal(m);
324                 }
325         }
326
327         return false;
328 }
329
330 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
331         while (true) {
332                 if (!arbitratorTable->contains(keyName)) {
333                         // There is already an arbitrator
334                         return false;
335                 }
336                 NewKey *newKey = new NewKey(NULL, keyName, machineId);
337
338                 if (sendToServer(newKey)) {
339                         // If successfully inserted
340                         return true;
341                 }
342         }
343 }
344
345 void Table::startTransaction() {
346         // Create a new transaction, invalidates any old pending transactions.
347         pendingTransactionBuilder = new PendingTransaction(localMachineId);
348 }
349
350 void Table::addKV(IoTString *key, IoTString *value) {
351
352         // Make sure it is a valid key
353         if (!arbitratorTable->contains(key)) {
354                 throw new Error("Key not Found.");
355         }
356
357         // Make sure new key value pair matches the current arbitrator
358         if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
359                 // TODO: Maybe not throw en error
360                 throw new Error("Not all Key Values Match Arbitrator.");
361         }
362
363         // Add the key value to this transaction
364         KeyValue *kv = new KeyValue(key, value);
365         pendingTransactionBuilder->addKV(kv);
366 }
367
368 TransactionStatus *Table::commitTransaction() {
369         if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
370                 // transaction with no updates will have no effect on the system
371                 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
372         }
373
374         // Set the local transaction sequence number and increment
375         pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
376         localTransactionSequenceNumber++;
377
378         // Create the transaction status
379         TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
380
381         // Create the new transaction
382         Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
383         newTransaction->setTransactionStatus(transactionStatus);
384
385         if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
386                 // Add it to the queue and invalidate the builder for safety
387                 pendingTransactionQueue->add(newTransaction);
388         } else {
389                 arbitrateOnLocalTransaction(newTransaction);
390                 updateLiveStateFromLocal();
391         }
392
393         pendingTransactionBuilder = new PendingTransaction(localMachineId);
394
395         try {
396                 sendToServer(NULL);
397         } catch (ServerException *e) {
398
399                 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
400                 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
401                         Transaction *transaction = iter->next();
402
403                         if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
404                                 // Already contacted this client so ignore all attempts to contact this client
405                                 // to preserve ordering for arbitrator
406                                 continue;
407                         }
408
409                         Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
410
411                         if (sendReturn->getFirst()) {
412                                 // Failed to contact over local
413                                 arbitratorTriedAndFailed->add(transaction->getArbitrator());
414                         } else {
415                                 // Successful contact or should not contact
416
417                                 if (sendReturn->getSecond()) {
418                                         // did arbitrate
419                                         iter->remove();
420                                 }
421                         }
422                 }
423         }
424
425         updateLiveStateFromLocal();
426
427         return transactionStatus;
428 }
429
430 /**
431  * Recalculate the new resize threshold
432  */
433 void Table::setResizeThreshold() {
434         int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
435         bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
436 }
437
438 int64_t Table::getLocalSequenceNumber() {
439         return localSequenceNumber;
440 }
441
442 bool Table::sendToServer(NewKey *newKey) {
443         bool fromRetry = false;
444         try {
445                 if (hadPartialSendToServer) {
446                         Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
447                         if (newSlots->length() == 0) {
448                                 fromRetry = true;
449                                 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
450
451                                 if (sendSlotsReturn->getFirst()) {
452                                         if (newKey != NULL) {
453                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
454                                                         newKey = NULL;
455                                                 }
456                                         }
457
458                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
459                                                 transaction->resetServerFailure();
460                                                 // Update which transactions parts still need to be sent
461                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
462                                                 // Add the transaction status to the outstanding list
463                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
464
465                                                 // Update the transaction status
466                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
467
468                                                 // Check if all the transaction parts were successfully
469                                                 // sent and if so then remove it from pending
470                                                 if (transaction->didSendAllParts()) {
471                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
472                                                         pendingTransactionQueue->remove(transaction);
473                                                 }
474                                         }
475                                 } else {
476                                         newSlots = sendSlotsReturn->getThird();
477                                         bool isInserted = false;
478                                         for (uint si = 0; si < newSlots->length(); si++) {
479                                                 Slot *s = newSlots->get(si);
480                                                 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
481                                                         isInserted = true;
482                                                         break;
483                                                 }
484                                         }
485
486                                         for (uint si = 0; si < newSlots->length(); si++) {
487                                                 Slot *s = newSlots->get(si);
488                                                 if (isInserted) {
489                                                         break;
490                                                 }
491
492                                                 // Process each entry in the slot
493                                                 for (Entry *entry : s->getEntries()) {
494                                                         if (entry->getType() == TypeLastMessage) {
495                                                                 LastMessage *lastMessage = (LastMessage *)entry;
496                                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
497                                                                         isInserted = true;
498                                                                         break;
499                                                                 }
500                                                         }
501                                                 }
502                                         }
503
504                                         if (isInserted) {
505                                                 if (newKey != NULL) {
506                                                         if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
507                                                                 newKey = NULL;
508                                                         }
509                                                 }
510
511                                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
512                                                         transaction->resetServerFailure();
513
514                                                         // Update which transactions parts still need to be sent
515                                                         transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
516
517                                                         // Add the transaction status to the outstanding list
518                                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
519
520                                                         // Update the transaction status
521                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
522
523                                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
524                                                         if (transaction->didSendAllParts()) {
525                                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
526                                                                 pendingTransactionQueue->remove(transaction);
527                                                         } else {
528                                                                 transaction->resetServerFailure();
529                                                                 // Set the transaction sequence number back to nothing
530                                                                 if (!transaction->didSendAPartToServer()) {
531                                                                         transaction->setSequenceNumber(-1);
532                                                                 }
533                                                         }
534                                                 }
535                                         }
536                                 }
537
538                                 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
539                                         transaction->resetServerFailure();
540                                         // Set the transaction sequence number back to nothing
541                                         if (!transaction->didSendAPartToServer()) {
542                                                 transaction->setSequenceNumber(-1);
543                                         }
544                                 }
545
546                                 if (sendSlotsReturn->getThird()->length() != 0) {
547                                         // insert into the local block chain
548                                         validateAndUpdate(sendSlotsReturn->getThird(), true);
549                                 }
550                                 // continue;
551                         } else {
552                                 bool isInserted = false;
553                                 for (uint si = 0; si < newSlots->length(); si++) {
554                                         Slot *s = newSlots->get(si);
555                                         if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
556                                                 isInserted = true;
557                                                 break;
558                                         }
559                                 }
560
561                                 for (uint si = 0; si < newSlots->length(); si++) {
562                                         Slot *s = newSlots->get(si);
563                                         if (isInserted) {
564                                                 break;
565                                         }
566
567                                         // Process each entry in the slot
568                                         for (Entry *entry : s->getEntries()) {
569
570                                                 if (entry->getType() == TypeLastMessage) {
571                                                         LastMessage *lastMessage = (LastMessage *)entry;
572                                                         if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
573                                                                 isInserted = true;
574                                                                 break;
575                                                         }
576                                                 }
577                                         }
578                                 }
579
580                                 if (isInserted) {
581                                         if (newKey != NULL) {
582                                                 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
583                                                         newKey = NULL;
584                                                 }
585                                         }
586
587                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
588                                                 transaction->resetServerFailure();
589
590                                                 // Update which transactions parts still need to be sent
591                                                 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
592
593                                                 // Add the transaction status to the outstanding list
594                                                 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
595
596                                                 // Update the transaction status
597                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
598
599                                                 // Check if all the transaction parts were successfully sent and if so then remove it from pending
600                                                 if (transaction->didSendAllParts()) {
601                                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
602                                                         pendingTransactionQueue->remove(transaction);
603                                                 } else {
604                                                         transaction->resetServerFailure();
605                                                         // Set the transaction sequence number back to nothing
606                                                         if (!transaction->didSendAPartToServer()) {
607                                                                 transaction->setSequenceNumber(-1);
608                                                         }
609                                                 }
610                                         }
611                                 } else {
612                                         for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
613                                                 transaction->resetServerFailure();
614                                                 // Set the transaction sequence number back to nothing
615                                                 if (!transaction->didSendAPartToServer()) {
616                                                         transaction->setSequenceNumber(-1);
617                                                 }
618                                         }
619                                 }
620
621                                 // insert into the local block chain
622                                 validateAndUpdate(newSlots, true);
623                         }
624                 }
625         } catch (ServerException *e) {
626                 throw e;
627         }
628
629
630
631         try {
632                 // While we have stuff that needs inserting into the block chain
633                 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
634
635                         fromRetry = false;
636
637                         if (hadPartialSendToServer) {
638                                 throw new Error("Should Be error free");
639                         }
640
641
642
643                         // If there is a new key with same name then end
644                         if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
645                                 return false;
646                         }
647
648                         // Create the slot
649                         Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
650                         localSequenceNumber++;
651
652                         // Try to fill the slot with data
653                         ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
654                         bool needsResize = fillSlotsReturn->getFirst();
655                         int newSize = fillSlotsReturn->getSecond();
656                         bool insertedNewKey = fillSlotsReturn->getThird();
657
658                         if (needsResize) {
659                                 // Reset which transaction to send
660                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
661                                         transaction->resetNextPartToSend();
662
663                                         // Set the transaction sequence number back to nothing
664                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
665                                                 transaction->setSequenceNumber(-1);
666                                         }
667                                 }
668
669                                 // Clear the sent data since we are trying again
670                                 pendingSendArbitrationEntriesToDelete->clear();
671                                 transactionPartsSent->clear();
672
673                                 // We needed a resize so try again
674                                 fillSlot(slot, true, newKey);
675                         }
676
677                         lastSlotAttemptedToSend = slot;
678                         lastIsNewKey = (newKey != NULL);
679                         lastInsertedNewKey = insertedNewKey;
680                         lastNewSize = newSize;
681                         lastNewKey = newKey;
682                         lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
683                         lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
684
685
686                         ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
687
688                         if (sendSlotsReturn->getFirst()) {
689
690                                 // Did insert into the block chain
691
692                                 if (insertedNewKey) {
693                                         // This slot was what was inserted not a previous slot
694
695                                         // New Key was successfully inserted into the block chain so dont want to insert it again
696                                         newKey = NULL;
697                                 }
698
699                                 // Remove the aborts and commit parts that were sent from the pending to send queue
700                                 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
701                                         ArbitrationRound *round = iter->next();
702                                         round->removeParts(pendingSendArbitrationEntriesToDelete);
703
704                                         if (round->isDoneSending()) {
705                                                 // Sent all the parts
706                                                 iter->remove();
707                                         }
708                                 }
709
710                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
711                                         transaction->resetServerFailure();
712
713                                         // Update which transactions parts still need to be sent
714                                         transaction->removeSentParts(transactionPartsSent->get(transaction));
715
716                                         // Add the transaction status to the outstanding list
717                                         outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
718
719                                         // Update the transaction status
720                                         transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
721
722                                         // Check if all the transaction parts were successfully sent and if so then remove it from pending
723                                         if (transaction->didSendAllParts()) {
724                                                 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
725                                                 pendingTransactionQueue->remove(transaction);
726                                         }
727                                 }
728                         } else {
729                                 // Reset which transaction to send
730                                 for (Transaction *transaction : transactionPartsSent->keySet()) {
731                                         transaction->resetNextPartToSend();
732
733                                         // Set the transaction sequence number back to nothing
734                                         if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
735                                                 transaction->setSequenceNumber(-1);
736                                         }
737                                 }
738                         }
739
740                         // Clear the sent data in preparation for next send
741                         pendingSendArbitrationEntriesToDelete->clear();
742                         transactionPartsSent->clear();
743
744                         if (sendSlotsReturn->getThird()->length() != 0) {
745                                 // insert into the local block chain
746                                 validateAndUpdate(sendSlotsReturn->getThird(), true);
747                         }
748                 }
749
750         } catch (ServerException *e) {
751
752                 if (e->getType() != ServerException->TypeInputTimeout) {
753                         // Nothing was able to be sent to the server so just clear these data structures
754                         for (Transaction *transaction : transactionPartsSent->keySet()) {
755                                 transaction->resetNextPartToSend();
756
757                                 // Set the transaction sequence number back to nothing
758                                 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
759                                         transaction->setSequenceNumber(-1);
760                                 }
761                         }
762                 } else {
763                         // There was a partial send to the server
764                         hadPartialSendToServer = true;
765
766                         // Nothing was able to be sent to the server so just clear these data structures
767                         for (Transaction *transaction : transactionPartsSent->keySet()) {
768                                 transaction->resetNextPartToSend();
769                                 transaction->setServerFailure();
770                         }
771                 }
772
773                 pendingSendArbitrationEntriesToDelete->clear();
774                 transactionPartsSent->clear();
775
776                 throw e;
777         }
778
779         return newKey == NULL;
780 }
781
782 bool Table::updateFromLocal(int64_t machineId) {
783         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
784         if (localCommunicationInformation == NULL) {
785                 // Cant talk to that device locally so do nothing
786                 return false;
787         }
788
789         // Get the size of the send data
790         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
791
792         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
793         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
794                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
795         }
796
797         Array<char> *sendData = new Array<char>(sendDataSize);
798         ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
799
800         // Encode the data
801         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
802         bbEncode->putInt(0);
803
804         // Send by local
805         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
806         localSequenceNumber++;
807
808         if (returnData == NULL) {
809                 // Could not contact server
810                 return false;
811         }
812
813         // Decode the data
814         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
815         int numberOfEntries = bbDecode->getInt();
816
817         for (int i = 0; i < numberOfEntries; i++) {
818                 char type = bbDecode->get();
819                 if (type == TypeAbort) {
820                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
821                         processEntry(abort);
822                 } else if (type == TypeCommitPart) {
823                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
824                         processEntry(commitPart);
825                 }
826         }
827
828         updateLiveStateFromLocal();
829
830         return true;
831 }
832
833 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
834
835         // Get the devices local communications
836         Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
837
838         if (localCommunicationInformation == NULL) {
839                 // Cant talk to that device locally so do nothing
840                 return Pair<bool, bool>(true, false);
841         }
842
843         // Get the size of the send data
844         int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
845         for (TransactionPart *part : transaction->getParts()->values()) {
846                 sendDataSize += part->getSize();
847         }
848
849         int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
850         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
851                 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
852         }
853
854         // Make the send data size
855         Array<char> *sendData = new Array<char>(sendDataSize);
856         ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
857
858         // Encode the data
859         bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
860         bbEncode->putInt(transaction->getParts()->size());
861         for (TransactionPart *part : transaction->getParts()->values()) {
862                 part->encode(bbEncode);
863         }
864
865
866         // Send by local
867         Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868         localSequenceNumber++;
869
870         if (returnData == NULL) {
871                 // Could not contact server
872                 return Pair<bool, bool>(true, false);
873         }
874
875         // Decode the data
876         ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877         bool didCommit = bbDecode->get() == 1;
878         bool couldArbitrate = bbDecode->get() == 1;
879         int numberOfEntries = bbDecode->getInt();
880         bool foundAbort = false;
881
882         for (int i = 0; i < numberOfEntries; i++) {
883                 char type = bbDecode->get();
884                 if (type == TypeAbort) {
885                         Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
886
887                         if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
888                                 foundAbort = true;
889                         }
890
891                         processEntry(abort);
892                 } else if (type == TypeCommitPart) {
893                         CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
894                         processEntry(commitPart);
895                 }
896         }
897
898         updateLiveStateFromLocal();
899
900         if (couldArbitrate) {
901                 TransactionStatus status =  transaction->getTransactionStatus();
902                 if (didCommit) {
903                         status->setStatus(TransactionStatus_StatusCommitted);
904                 } else {
905                         status->setStatus(TransactionStatus_StatusAborted);
906                 }
907         } else {
908                 TransactionStatus status =  transaction->getTransactionStatus();
909                 if (foundAbort) {
910                         status->setStatus(TransactionStatus_StatusAborted);
911                 } else {
912                         status->setStatus(TransactionStatus_StatusCommitted);
913                 }
914         }
915
916         return Pair<bool, bool>(false, true);
917 }
918
919 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
920
921         // Decode the data
922         ByteBuffer *bbDecode = ByteBuffer_wrap(data);
923         int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
924         int numberOfParts = bbDecode->getInt();
925
926         // If we did commit a transaction or not
927         bool didCommit = false;
928         bool couldArbitrate = false;
929
930         if (numberOfParts != 0) {
931
932                 // decode the transaction
933                 Transaction *transaction = new Transaction();
934                 for (int i = 0; i < numberOfParts; i++) {
935                         bbDecode->get();
936                         TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
937                         transaction->addPartDecode(newPart);
938                 }
939
940                 // Arbitrate on transaction and pull relevant return data
941                 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
942                 couldArbitrate = localArbitrateReturn->getFirst();
943                 didCommit = localArbitrateReturn->getSecond();
944
945                 updateLiveStateFromLocal();
946
947                 // Transaction was sent to the server so keep track of it to prevent double commit
948                 if (transaction->getSequenceNumber() != -1) {
949                         offlineTransactionsCommittedAndAtServer->add(transaction->getId());
950                 }
951         }
952
953         // The data to send back
954         int returnDataSize = 0;
955         Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
956
957         // Get the aborts to send back
958         Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
959         Collections->sort(abortLocalSequenceNumbers);
960         for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
961                 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
962                         continue;
963                 }
964
965                 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
966                 unseenArbitrations->add(abort);
967                 returnDataSize += abort->getSize();
968         }
969
970         // Get the commits to send back
971         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
972         if (commitForClientTable != NULL) {
973                 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
974                 Collections->sort(commitLocalSequenceNumbers);
975
976                 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
977                         Commit *commit = commitForClientTable->get(localSequenceNumber);
978
979                         if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
980                                 continue;
981                         }
982
983                         unseenArbitrations->addAll(commit->getParts()->values());
984
985                         for (CommitPart commitPart : commit->getParts()->values()) {
986                                 returnDataSize += commitPart->getSize();
987                         }
988                 }
989         }
990
991         // Number of arbitration entries to decode
992         returnDataSize += 2 * sizeof(int32_t);
993
994         // bool of did commit or not
995         if (numberOfParts != 0) {
996                 returnDataSize += sizeof(char);
997         }
998
999         // Data to send Back
1000         Array<char> *returnData = new Array<char>(returnDataSize);
1001         ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1002
1003         if (numberOfParts != 0) {
1004                 if (didCommit) {
1005                         bbEncode->put((char)1);
1006                 } else {
1007                         bbEncode->put((char)0);
1008                 }
1009                 if (couldArbitrate) {
1010                         bbEncode->put((char)1);
1011                 } else {
1012                         bbEncode->put((char)0);
1013                 }
1014         }
1015
1016         bbEncode->putInt(unseenArbitrations->size());
1017         for (Entry *entry : unseenArbitrations) {
1018                 entry->encode(bbEncode);
1019         }
1020
1021
1022         localSequenceNumber++;
1023         return returnData;
1024 }
1025
1026 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1027         bool attemptedToSendToServerTmp = attemptedToSendToServer;
1028         attemptedToSendToServer = true;
1029
1030         bool inserted = false;
1031         bool lastTryInserted = false;
1032
1033         Array<Slot *> *array = cloud->putSlot(slot, newSize);
1034         if (array == NULL) {
1035                 array = new Array<Slot *>();
1036                 array->set(0, slot);
1037                 rejectedSlotVector->clear();
1038                 inserted = true;
1039         } else {
1040                 if (array->length() == 0) {
1041                         throw new Error("Server Error: Did not send any slots");
1042                 }
1043
1044                 // if (attemptedToSendToServerTmp) {
1045                 if (hadPartialSendToServer) {
1046
1047                         bool isInserted = false;
1048                         for (Slot *s : array) {
1049                                 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1050                                         isInserted = true;
1051                                         break;
1052                                 }
1053                         }
1054
1055                         for (Slot *s : array) {
1056                                 if (isInserted) {
1057                                         break;
1058                                 }
1059
1060                                 // Process each entry in the slot
1061                                 for (Entry *entry : s->getEntries()) {
1062
1063                                         if (entry->getType() == TypeLastMessage) {
1064                                                 LastMessage *lastMessage = (LastMessage *)entry;
1065
1066                                                 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1067                                                         isInserted = true;
1068                                                         break;
1069                                                 }
1070                                         }
1071                                 }
1072                         }
1073
1074                         if (!isInserted) {
1075                                 rejectedSlotVector->add(slot->getSequenceNumber());
1076                                 lastTryInserted = false;
1077                         } else {
1078                                 lastTryInserted = true;
1079                         }
1080                 } else {
1081                         rejectedSlotVector->add(slot->getSequenceNumber());
1082                         lastTryInserted = false;
1083                 }
1084         }
1085
1086         return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1087 }
1088
1089 /**
1090  * Returns false if a resize was needed
1091  */
1092 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1093         int newSize = 0;
1094         if (liveSlotCount > bufferResizeThreshold) {
1095                 resize = true;  //Resize is forced
1096         }
1097
1098         if (resize) {
1099                 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1100                 TableStatus *status = new TableStatus(slot, newSize);
1101                 slot->addEntry(status);
1102         }
1103
1104         // Fill with rejected slots first before doing anything else
1105         doRejectedMessages(slot);
1106
1107         // Do mandatory rescue of entries
1108         ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1109
1110         // Extract working variables
1111         bool needsResize = mandatoryRescueReturn->getFirst();
1112         bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1113         int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1114
1115         if (needsResize && !resize) {
1116                 // We need to resize but we are not resizing so return false
1117                 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1118         }
1119
1120         bool inserted = false;
1121         if (newKeyEntry != NULL) {
1122                 newKeyEntry->setSlot(slot);
1123                 if (slot->hasSpace(newKeyEntry)) {
1124                         slot->addEntry(newKeyEntry);
1125                         inserted = true;
1126                 }
1127         }
1128
1129         // Clear the transactions, aborts and commits that were sent previously
1130         transactionPartsSent->clear();
1131         pendingSendArbitrationEntriesToDelete->clear();
1132
1133         for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1134                 bool isFull = false;
1135                 round->generateParts();
1136                 Vector<Entry *> *parts = round->getParts();
1137
1138                 // Insert pending arbitration data
1139                 for (Entry *arbitrationData : parts) {
1140
1141                         // If it is an abort then we need to set some information
1142                         if (arbitrationData instanceof Abort) {
1143                                 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1144                         }
1145
1146                         if (!slot->hasSpace(arbitrationData)) {
1147                                 // No space so cant do anything else with these data entries
1148                                 isFull = true;
1149                                 break;
1150                         }
1151
1152                         // Add to this current slot and add it to entries to delete
1153                         slot->addEntry(arbitrationData);
1154                         pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1155                 }
1156
1157                 if (isFull) {
1158                         break;
1159                 }
1160         }
1161
1162         if (pendingTransactionQueue->size() > 0) {
1163                 Transaction *transaction = pendingTransactionQueue->get(0);
1164                 // Set the transaction sequence number if it has yet to be inserted into the block chain
1165                 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1166                         transaction->setSequenceNumber(slot->getSequenceNumber());
1167                 }
1168
1169                 while (true) {
1170                         TransactionPart *part = transaction->getNextPartToSend();
1171                         if (part == NULL) {
1172                                 // Ran out of parts to send for this transaction so move on
1173                                 break;
1174                         }
1175
1176                         if (slot->hasSpace(part)) {
1177                                 slot->addEntry(part);
1178                                 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1179                                 if (partsSent == NULL) {
1180                                         partsSent = new Vector<int32_t>();
1181                                         transactionPartsSent->put(transaction, partsSent);
1182                                 }
1183                                 partsSent->add(part->getPartNumber());
1184                                 transactionPartsSent->put(transaction, partsSent);
1185                         } else {
1186                                 break;
1187                         }
1188                 }
1189         }
1190
1191         // Fill the remainder of the slot with rescue data
1192         doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1193
1194         return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1195 }
1196
1197 void Table::doRejectedMessages(Slot *s) {
1198         if (!rejectedSlotVector->isEmpty()) {
1199                 /* TODO: We should avoid generating a rejected message entry if
1200                  * there is already a sufficient entry in the queue (e->g->,
1201                  * equalsto value of true and same sequence number)->  */
1202
1203                 int64_t old_seqn = rejectedSlotVector->firstElement();
1204                 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1205                         int64_t new_seqn = rejectedSlotVector->lastElement();
1206                         RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1207                         s->addEntry(rm);
1208                 } else {
1209                         int64_t prev_seqn = -1;
1210                         int i = 0;
1211                         /* Go through list of missing messages */
1212                         for (; i < rejectedSlotVector->size(); i++) {
1213                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1214                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1215                                 if (s_msg != NULL)
1216                                         break;
1217                                 prev_seqn = curr_seqn;
1218                         }
1219                         /* Generate rejected message entry for missing messages */
1220                         if (prev_seqn != -1) {
1221                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1222                                 s->addEntry(rm);
1223                         }
1224                         /* Generate rejected message entries for present messages */
1225                         for (; i < rejectedSlotVector->size(); i++) {
1226                                 int64_t curr_seqn = rejectedSlotVector->get(i);
1227                                 Slot *s_msg = buffer->getSlot(curr_seqn);
1228                                 int64_t machineid = s_msg->getMachineID();
1229                                 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1230                                 s->addEntry(rm);
1231                         }
1232                 }
1233         }
1234 }
1235
1236 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1237         int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1238         int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1239         if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1240                 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1241         }
1242
1243         int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1244         bool seenLiveSlot = false;
1245         int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots;         // smallest seq number in the buffer if it is full
1246         int64_t threshold = firstIfFull + Table_FREE_SLOTS;             // we want the buffer to be clear of live entries up to this point
1247
1248
1249         // Mandatory Rescue
1250         for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1251                 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1252                 // Push slot number forward
1253                 if (!seenLiveSlot) {
1254                         oldestLiveSlotSequenceNumver = currentSequenceNumber;
1255                 }
1256
1257                 if (!previousSlot->isLive()) {
1258                         continue;
1259                 }
1260
1261                 // We have seen a live slot
1262                 seenLiveSlot = true;
1263
1264                 // Get all the live entries for a slot
1265                 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1266
1267                 // Iterate over all the live entries and try to rescue them
1268                 for (Entry *liveEntry : liveEntries) {
1269                         if (slot->hasSpace(liveEntry)) {
1270
1271                                 // Enough space to rescue the entry
1272                                 slot->addEntry(liveEntry);
1273                         } else if (currentSequenceNumber == firstIfFull) {
1274                                 //if there's no space but the entry is about to fall off the queue
1275                                 System->out->println("B");      //?
1276                                 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1277
1278                         }
1279                 }
1280         }
1281
1282         // Did not resize
1283         return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1284 }
1285
1286 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1287         /* now go through live entries from least to greatest sequence number until
1288          * either all live slots added, or the slot doesn't have enough room
1289          * for SKIP_THRESHOLD consecutive entries*/
1290         int skipcount = 0;
1291         int64_t newestseqnum = buffer->getNewestSeqNum();
1292 search:
1293         for (; seqn <= newestseqnum; seqn++) {
1294                 Slot *prevslot = buffer->getSlot(seqn);
1295                 //Push slot number forward
1296                 if (!seenliveslot)
1297                         oldestLiveSlotSequenceNumver = seqn;
1298
1299                 if (!prevslot->isLive())
1300                         continue;
1301                 seenliveslot = true;
1302                 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1303                 for (Entry *liveentry : liveentries) {
1304                         if (s->hasSpace(liveentry))
1305                                 s->addEntry(liveentry);
1306                         else {
1307                                 skipcount++;
1308                                 if (skipcount > Table_SKIP_THRESHOLD)
1309                                         break search;
1310                         }
1311                 }
1312         }
1313 }
1314
1315 /**
1316  * Checks for malicious activity and updates the local copy of the block chain->
1317  */
1318 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1319
1320         // The cloud communication layer has checked slot HMACs already
1321         // before decoding
1322         if (newSlots->length() == 0) {
1323                 return;
1324         }
1325
1326         // Make sure all slots are newer than the last largest slot this
1327         // client has seen
1328         int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1329         if (firstSeqNum <= sequenceNumber) {
1330                 throw new Error("Server Error: Sent older slots!");
1331         }
1332
1333         // Create an object that can access both new slots and slots in our
1334         // local chain without committing slots to our local chain
1335         SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1336
1337         // Check that the HMAC chain is not broken
1338         checkHMACChain(indexer, newSlots);
1339
1340         // Set to keep track of messages from clients
1341         Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1342
1343         // Process each slots data
1344         for (Slot *slot : newSlots) {
1345                 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1346
1347                 updateExpectedSize();
1348         }
1349
1350         // If there is a gap, check to see if the server sent us
1351         // everything->
1352         if (firstSeqNum != (sequenceNumber + 1)) {
1353
1354                 // Check the size of the slots that were sent down by the server->
1355                 // Can only check the size if there was a gap
1356                 checkNumSlots(newSlots->length);
1357
1358                 // Since there was a gap every machine must have pushed a slot or
1359                 // must have a last message message-> If not then the server is
1360                 // hiding slots
1361                 if (!machineSet->isEmpty()) {
1362                         throw new Error("Missing record for machines: " + machineSet);
1363                 }
1364         }
1365
1366         // Update the size of our local block chain->
1367         commitNewMaxSize();
1368
1369         // Commit new to slots to the local block chain->
1370         for (Slot *slot : newSlots) {
1371
1372                 // Insert this slot into our local block chain copy->
1373                 buffer->putSlot(slot);
1374
1375                 // Keep track of how many slots are currently live (have live data
1376                 // in them)->
1377                 liveSlotCount++;
1378         }
1379
1380         // Get the sequence number of the latest slot in the system
1381         sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1382         updateLiveStateFromServer();
1383
1384         // No Need to remember after we pulled from the server
1385         offlineTransactionsCommittedAndAtServer->clear();
1386
1387         // This is invalidated now
1388         hadPartialSendToServer = false;
1389 }
1390
1391 void Table::updateLiveStateFromServer() {
1392         // Process the new transaction parts
1393         processNewTransactionParts();
1394
1395         // Do arbitration on new transactions that were received
1396         arbitrateFromServer();
1397
1398         // Update all the committed keys
1399         bool didCommitOrSpeculate = updateCommittedTable();
1400
1401         // Delete the transactions that are now dead
1402         updateLiveTransactionsAndStatus();
1403
1404         // Do speculations
1405         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1406         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1407 }
1408
1409 void Table::updateLiveStateFromLocal() {
1410         // Update all the committed keys
1411         bool didCommitOrSpeculate = updateCommittedTable();
1412
1413         // Delete the transactions that are now dead
1414         updateLiveTransactionsAndStatus();
1415
1416         // Do speculations
1417         didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1418         updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1419 }
1420
1421 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1422         int64_t prevslots = firstSequenceNumber;
1423
1424         if (didFindTableStatus) {
1425         } else {
1426                 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1427         }
1428
1429         didFindTableStatus = true;
1430         currMaxSize = numberOfSlots;
1431 }
1432
1433 void Table::updateExpectedSize() {
1434         expectedsize++;
1435
1436         if (expectedsize > currMaxSize) {
1437                 expectedsize = currMaxSize;
1438         }
1439 }
1440
1441
1442 /**
1443  * Check the size of the block chain to make sure there are enough
1444  * slots sent back by the server-> This is only called when we have a
1445  * gap between the slots that we have locally and the slots sent by
1446  * the server therefore in the slots sent by the server there will be
1447  * at least 1 Table status message
1448  */
1449 void Table::checkNumSlots(int numberOfSlots) {
1450         if (numberOfSlots != expectedsize) {
1451                 throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
1452         }
1453 }
1454
1455 void Table::updateCurrMaxSize(int newmaxsize) {
1456         currMaxSize = newmaxsize;
1457 }
1458
1459
1460 /**
1461  * Update the size of of the local buffer if it is needed->
1462  */
1463 void Table::commitNewMaxSize() {
1464         didFindTableStatus = false;
1465
1466         // Resize the local slot buffer
1467         if (numberOfSlots != currMaxSize) {
1468                 buffer->resize((int32_t)currMaxSize);
1469         }
1470
1471         // Change the number of local slots to the new size
1472         numberOfSlots = (int32_t)currMaxSize;
1473
1474         // Recalculate the resize threshold since the size of the local
1475         // buffer has changed
1476         setResizeThreshold();
1477 }
1478
1479 /**
1480  * Process the new transaction parts from this latest round of slots
1481  * received from the server
1482  */
1483 void Table::processNewTransactionParts() {
1484
1485         if (newTransactionParts->size() == 0) {
1486                 // Nothing new to process
1487                 return;
1488         }
1489
1490         // Iterate through all the machine Ids that we received new parts
1491         // for
1492         for (int64_t machineId : newTransactionParts->keySet()) {
1493                 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1494
1495                 // Iterate through all the parts for that machine Id
1496                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1497                         TransactionPart *part = parts->get(partId);
1498
1499                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1500                         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1501                                 // Set dead the transaction part
1502                                 part->setDead();
1503                                 continue;
1504                         }
1505
1506                         // Get the transaction object for that sequence number
1507                         Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1508
1509                         if (transaction == NULL) {
1510                                 // This is a new transaction that we dont have so make a new one
1511                                 transaction = new Transaction();
1512
1513                                 // Insert this new transaction into the live tables
1514                                 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1515                                 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1516                         }
1517
1518                         // Add that part to the transaction
1519                         transaction->addPartDecode(part);
1520                 }
1521         }
1522
1523         // Clear all the new transaction parts in preparation for the next
1524         // time the server sends slots
1525         newTransactionParts->clear();
1526 }
1527
1528 void Table::arbitrateFromServer() {
1529
1530         if (liveTransactionBySequenceNumberTable->size() == 0) {
1531                 // Nothing to arbitrate on so move on
1532                 return;
1533         }
1534
1535         // Get the transaction sequence numbers and sort from oldest to newest
1536         Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1537         Collections->sort(transactionSequenceNumbers);
1538
1539         // Collection of key value pairs that are
1540         Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1541
1542         // The last transaction arbitrated on
1543         int64_t lastTransactionCommitted = -1;
1544         Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1545
1546         for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1547                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1548
1549
1550
1551                 // Check if this machine arbitrates for this transaction if not
1552                 // then we cant arbitrate this transaction
1553                 if (transaction->getArbitrator() != localMachineId) {
1554                         continue;
1555                 }
1556
1557                 if (transactionSequenceNumber < lastSeqNumArbOn) {
1558                         continue;
1559                 }
1560
1561                 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1562                         // We have seen this already locally so dont commit again
1563                         continue;
1564                 }
1565
1566
1567                 if (!transaction->isComplete()) {
1568                         // Will arbitrate in incorrect order if we continue so just break
1569                         // Most likely this
1570                         break;
1571                 }
1572
1573
1574                 // update the largest transaction seen by arbitrator from server
1575                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1576                         lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1577                 } else {
1578                         int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1579                         if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1580                                 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1581                         }
1582                 }
1583
1584                 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1585                         // Guard evaluated as true
1586
1587                         // Update the local changes so we can make the commit
1588                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1589                                 speculativeTableTmp->put(kv->getKey(), kv);
1590                         }
1591
1592                         // Update what the last transaction committed was for use in batch commit
1593                         lastTransactionCommitted = transactionSequenceNumber;
1594                 } else {
1595                         // Guard evaluated was false so create abort
1596                         // Create the abort
1597                         Abort *newAbort = new Abort(NULL,
1598                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1599                                                                                                                                         transaction->getSequenceNumber(),
1600                                                                                                                                         transaction->getMachineId(),
1601                                                                                                                                         transaction->getArbitrator(),
1602                                                                                                                                         localArbitrationSequenceNumber);
1603                         localArbitrationSequenceNumber++;
1604                         generatedAborts->add(newAbort);
1605
1606                         // Insert the abort so we can process
1607                         processEntry(newAbort);
1608                 }
1609
1610                 lastSeqNumArbOn = transactionSequenceNumber;
1611         }
1612
1613         Commit *newCommit = NULL;
1614
1615         // If there is something to commit
1616         if (speculativeTableTmp->size() != 0) {
1617                 // Create the commit and increment the commit sequence number
1618                 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1619                 localArbitrationSequenceNumber++;
1620
1621                 // Add all the new keys to the commit
1622                 for (KeyValue *kv : speculativeTableTmp->values()) {
1623                         newCommit->addKV(kv);
1624                 }
1625
1626                 // create the commit parts
1627                 newCommit->createCommitParts();
1628
1629                 // Append all the commit parts to the end of the pending queue
1630                 // waiting for sending to the server
1631                 // Insert the commit so we can process it
1632                 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1633                         processEntry(commitPart);
1634                 }
1635         }
1636
1637         if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1638                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1639                 pendingSendArbitrationRounds->add(arbitrationRound);
1640
1641                 if (compactArbitrationData()) {
1642                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1643                         if (newArbitrationRound->getCommit() != NULL) {
1644                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1645                                         processEntry(commitPart);
1646                                 }
1647                         }
1648                 }
1649         }
1650 }
1651
1652 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1653
1654         // Check if this machine arbitrates for this transaction if not then
1655         // we cant arbitrate this transaction
1656         if (transaction->getArbitrator() != localMachineId) {
1657                 return Pair<bool, bool>(false, false);
1658         }
1659
1660         if (!transaction->isComplete()) {
1661                 // Will arbitrate in incorrect order if we continue so just break
1662                 // Most likely this
1663                 return Pair<bool, bool>(false, false);
1664         }
1665
1666         if (transaction->getMachineId() != localMachineId) {
1667                 // dont do this check for local transactions
1668                 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1669                         if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1670                                 // We've have already seen this from the server
1671                                 return Pair<bool, bool>(false, false);
1672                         }
1673                 }
1674         }
1675
1676         if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1677                 // Guard evaluated as true Create the commit and increment the
1678                 // commit sequence number
1679                 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1680                 localArbitrationSequenceNumber++;
1681
1682                 // Update the local changes so we can make the commit
1683                 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1684                         newCommit->addKV(kv);
1685                 }
1686
1687                 // create the commit parts
1688                 newCommit->createCommitParts();
1689
1690                 // Append all the commit parts to the end of the pending queue
1691                 // waiting for sending to the server
1692                 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1693                 pendingSendArbitrationRounds->add(arbitrationRound);
1694
1695                 if (compactArbitrationData()) {
1696                         ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1697                         for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1698                                 processEntry(commitPart);
1699                         }
1700                 } else {
1701                         // Insert the commit so we can process it
1702                         for (CommitPart *commitPart : newCommit->getParts()->values()) {
1703                                 processEntry(commitPart);
1704                         }
1705                 }
1706
1707                 if (transaction->getMachineId() == localMachineId) {
1708                         TransactionStatus *status = transaction->getTransactionStatus();
1709                         if (status != NULL) {
1710                                 status->setStatus(TransactionStatus_StatusCommitted);
1711                         }
1712                 }
1713
1714                 updateLiveStateFromLocal();
1715                 return Pair<bool, bool>(true, true);
1716         } else {
1717                 if (transaction->getMachineId() == localMachineId) {
1718                         // For locally created messages update the status
1719                         // Guard evaluated was false so create abort
1720                         TransactionStatus status = transaction->getTransactionStatus();
1721                         if (status != NULL) {
1722                                 status->setStatus(TransactionStatus_StatusAborted);
1723                         }
1724                 } else {
1725                         Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1726
1727                         // Create the abort
1728                         Abort *newAbort = new Abort(NULL,
1729                                                                                                                                         transaction->getClientLocalSequenceNumber(),
1730                                                                                                                                         -1,
1731                                                                                                                                         transaction->getMachineId(),
1732                                                                                                                                         transaction->getArbitrator(),
1733                                                                                                                                         localArbitrationSequenceNumber);
1734                         localArbitrationSequenceNumber++;
1735                         addAbortSet->add(newAbort);
1736
1737                         // Append all the commit parts to the end of the pending queue
1738                         // waiting for sending to the server
1739                         ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1740                         pendingSendArbitrationRounds->add(arbitrationRound);
1741
1742                         if (compactArbitrationData()) {
1743                                 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1744                                 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1745                                         processEntry(commitPart);
1746                                 }
1747                         }
1748                 }
1749
1750                 updateLiveStateFromLocal();
1751                 return Pair<bool, bool>(true, false);
1752         }
1753 }
1754
1755 /**
1756  * Compacts the arbitration data my merging commits and aggregating
1757  * aborts so that a single large push of commits can be done instead
1758  * of many small updates
1759  */
1760 bool Table::compactArbitrationData() {
1761         if (pendingSendArbitrationRounds->size() < 2) {
1762                 // Nothing to compact so do nothing
1763                 return false;
1764         }
1765
1766         ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1767         if (lastRound->didSendPart()) {
1768                 return false;
1769         }
1770
1771         bool hadCommit = (lastRound->getCommit() == NULL);
1772         bool gotNewCommit = false;
1773
1774         int numberToDelete = 1;
1775         while (numberToDelete < pendingSendArbitrationRounds->size()) {
1776                 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1777
1778                 if (round->isFull() || round->didSendPart()) {
1779                         // Stop since there is a part that cannot be compacted and we
1780                         // need to compact in order
1781                         break;
1782                 }
1783
1784                 if (round->getCommit() == NULL) {
1785                         // Try compacting aborts only
1786                         int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1787                         if (newSize > ArbitrationRound->MAX_PARTS) {
1788                                 // Cant compact since it would be too large
1789                                 break;
1790                         }
1791                         lastRound->addAborts(round->getAborts());
1792                 } else {
1793                         // Create a new larger commit
1794                         Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1795                         localArbitrationSequenceNumber++;
1796
1797                         // Create the commit parts so that we can count them
1798                         newCommit->createCommitParts();
1799
1800                         // Calculate the new size of the parts
1801                         int newSize = newCommit->getNumberOfParts();
1802                         newSize += lastRound->getAbortsCount();
1803                         newSize += round->getAbortsCount();
1804
1805                         if (newSize > ArbitrationRound->MAX_PARTS) {
1806                                 // Cant compact since it would be too large
1807                                 break;
1808                         }
1809
1810                         // Set the new compacted part
1811                         lastRound->setCommit(newCommit);
1812                         lastRound->addAborts(round->getAborts());
1813                         gotNewCommit = true;
1814                 }
1815
1816                 numberToDelete++;
1817         }
1818
1819         if (numberToDelete != 1) {
1820                 // If there is a compaction
1821                 // Delete the previous pieces that are now in the new compacted piece
1822                 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1823                         pendingSendArbitrationRounds->clear();
1824                 } else {
1825                         for (int i = 0; i < numberToDelete; i++) {
1826                                 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1827                         }
1828                 }
1829
1830                 // Add the new compacted into the pending to send list
1831                 pendingSendArbitrationRounds->add(lastRound);
1832
1833                 // Should reinsert into the commit processor
1834                 if (hadCommit && gotNewCommit) {
1835                         return true;
1836                 }
1837         }
1838
1839         return false;
1840 }
1841
1842 /**
1843  * Update all the commits and the committed tables, sets dead the dead
1844  * transactions
1845  */
1846 bool Table::updateCommittedTable() {
1847
1848         if (newCommitParts->size() == 0) {
1849                 // Nothing new to process
1850                 return false;
1851         }
1852
1853         // Iterate through all the machine Ids that we received new parts for
1854         for (int64_t machineId : newCommitParts->keySet()) {
1855                 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1856
1857                 // Iterate through all the parts for that machine Id
1858                 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1859                         CommitPart *part = parts->get(partId);
1860
1861                         // Get the transaction object for that sequence number
1862                         Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1863
1864                         if (commitForClientTable == NULL) {
1865                                 // This is the first commit from this device
1866                                 commitForClientTable = new Hashtable<int64_t, Commit *>();
1867                                 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1868                         }
1869
1870                         Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1871
1872                         if (commit == NULL) {
1873                                 // This is a new commit that we dont have so make a new one
1874                                 commit = new Commit();
1875
1876                                 // Insert this new commit into the live tables
1877                                 commitForClientTable->put(part->getSequenceNumber(), commit);
1878                         }
1879
1880                         // Add that part to the commit
1881                         commit->addPartDecode(part);
1882                 }
1883         }
1884
1885         // Clear all the new commits parts in preparation for the next time
1886         // the server sends slots
1887         newCommitParts->clear();
1888
1889         // If we process a new commit keep track of it for future use
1890         bool didProcessANewCommit = false;
1891
1892         // Process the commits one by one
1893         for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1894
1895                 // Get all the commits for a specific arbitrator
1896                 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1897
1898                 // Sort the commits in order
1899                 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1900                 Collections->sort(commitSequenceNumbers);
1901
1902                 // Get the last commit seen from this arbitrator
1903                 int64_t lastCommitSeenSequenceNumber = -1;
1904                 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1905                         lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1906                 }
1907
1908                 // Go through each new commit one by one
1909                 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1910                         int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1911                         Commit *commit = commitForClientTable->get(commitSequenceNumber);
1912
1913                         // Special processing if a commit is not complete
1914                         if (!commit->isComplete()) {
1915                                 if (i == (commitSequenceNumbers->size() - 1)) {
1916                                         // If there is an incomplete commit and this commit is the
1917                                         // latest one seen then this commit cannot be processed and
1918                                         // there are no other commits
1919                                         break;
1920                                 } else {
1921                                         // This is a commit that was already dead but parts of it
1922                                         // are still in the block chain (not flushed out yet)->
1923                                         // Delete it and move on
1924                                         commit->setDead();
1925                                         commitForClientTable->remove(commit->getSequenceNumber());
1926                                         continue;
1927                                 }
1928                         }
1929
1930                         // Update the last transaction that was updated if we can
1931                         if (commit->getTransactionSequenceNumber() != -1) {
1932                                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1933
1934                                 // Update the last transaction sequence number that the arbitrator arbitrated on
1935                                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1936                                         lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1937                                 }
1938                         }
1939
1940                         // Update the last arbitration data that we have seen so far
1941                         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1942
1943                                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1944                                 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1945                                         // Is larger
1946                                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1947                                 }
1948                         } else {
1949                                 // Never seen any data from this arbitrator so record the first one
1950                                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1951                         }
1952
1953                         // We have already seen this commit before so need to do the
1954                         // full processing on this commit
1955                         if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1956
1957                                 // Update the last transaction that was updated if we can
1958                                 if (commit->getTransactionSequenceNumber() != -1) {
1959                                         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1960
1961                                         // Update the last transaction sequence number that the arbitrator arbitrated on
1962                                         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1963                                                 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1964                                         }
1965                                 }
1966
1967                                 continue;
1968                         }
1969
1970                         // If we got here then this is a brand new commit and needs full
1971                         // processing
1972                         // Get what commits should be edited, these are the commits that
1973                         // have live values for their keys
1974                         Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1975                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1976                                 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1977                         }
1978                         commitsToEdit->remove(NULL);            // remove NULL since it could be in this set
1979
1980                         // Update each previous commit that needs to be updated
1981                         for (Commit *previousCommit : commitsToEdit) {
1982
1983                                 // Only bother with live commits (TODO: Maybe remove this check)
1984                                 if (previousCommit->isLive()) {
1985
1986                                         // Update which keys in the old commits are still live
1987                                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1988                                                 previousCommit->invalidateKey(kv->getKey());
1989                                         }
1990
1991                                         // if the commit is now dead then remove it
1992                                         if (!previousCommit->isLive()) {
1993                                                 commitForClientTable->remove(previousCommit);
1994                                         }
1995                                 }
1996                         }
1997
1998                         // Update the last seen sequence number from this arbitrator
1999                         if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2000                                 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2001                                         lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2002                                 }
2003                         } else {
2004                                 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2005                         }
2006
2007                         // We processed a new commit that we havent seen before
2008                         didProcessANewCommit = true;
2009
2010                         // Update the committed table of keys and which commit is using which key
2011                         for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2012                                 committedKeyValueTable->put(kv->getKey(), kv);
2013                                 liveCommitsByKeyTable->put(kv->getKey(), commit);
2014                         }
2015                 }
2016         }
2017
2018         return didProcessANewCommit;
2019 }
2020
2021 /**
2022  * Create the speculative table from transactions that are still live
2023  * and have come from the cloud
2024  */
2025 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2026         if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2027                 // There is nothing to speculate on
2028                 return false;
2029         }
2030
2031         // Create a list of the transaction sequence numbers and sort them
2032         // from oldest to newest
2033         Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2034         Collections->sort(transactionSequenceNumbersSorted);
2035
2036         bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2037
2038
2039         if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2040                 // If there is a gap in the transaction sequence numbers then
2041                 // there was a commit or an abort of a transaction OR there was a
2042                 // new commit (Could be from offline commit) so a redo the
2043                 // speculation from scratch
2044
2045                 // Start from scratch
2046                 speculatedKeyValueTable->clear();
2047                 lastTransactionSequenceNumberSpeculatedOn = -1;
2048                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2049
2050         }
2051
2052         // Remember the front of the transaction list
2053         oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2054
2055         // Find where to start arbitration from
2056         int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2057
2058         if (startIndex >= transactionSequenceNumbersSorted->size()) {
2059                 // Make sure we are not out of bounds
2060                 return false;           // did not speculate
2061         }
2062
2063         Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2064         bool didSkip = true;
2065
2066         for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2067                 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2068                 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2069
2070                 if (!transaction->isComplete()) {
2071                         // If there is an incomplete transaction then there is nothing
2072                         // we can do add this transactions arbitrator to the list of
2073                         // arbitrators we should ignore
2074                         incompleteTransactionArbitrator->add(transaction->getArbitrator());
2075                         didSkip = true;
2076                         continue;
2077                 }
2078
2079                 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2080                         continue;
2081                 }
2082
2083                 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2084
2085                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2086                         // Guard evaluated to true so update the speculative table
2087                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2088                                 speculatedKeyValueTable->put(kv->getKey(), kv);
2089                         }
2090                 }
2091         }
2092
2093         if (didSkip) {
2094                 // Since there was a skip we need to redo the speculation next time around
2095                 lastTransactionSequenceNumberSpeculatedOn = -1;
2096                 oldestTransactionSequenceNumberSpeculatedOn = -1;
2097         }
2098
2099         // We did some speculation
2100         return true;
2101 }
2102
2103 /**
2104  * Create the pending transaction speculative table from transactions
2105  * that are still in the pending transaction buffer
2106  */
2107 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2108         if (pendingTransactionQueue->size() == 0) {
2109                 // There is nothing to speculate on
2110                 return;
2111         }
2112
2113         if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2114                 // need to reset on the pending speculation
2115                 lastPendingTransactionSpeculatedOn = NULL;
2116                 firstPendingTransaction = pendingTransactionQueue->get(0);
2117                 pendingTransactionSpeculatedKeyValueTable->clear();
2118         }
2119
2120         // Find where to start arbitration from
2121         int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2122
2123         if (startIndex >= pendingTransactionQueue->size()) {
2124                 // Make sure we are not out of bounds
2125                 return;
2126         }
2127
2128         for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2129                 Transaction *transaction = pendingTransactionQueue->get(i);
2130
2131                 lastPendingTransactionSpeculatedOn = transaction;
2132
2133                 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2134                         // Guard evaluated to true so update the speculative table
2135                         for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2136                                 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2137                         }
2138                 }
2139         }
2140 }
2141
2142 /**
2143  * Set dead and remove from the live transaction tables the
2144  * transactions that are dead
2145  */
2146 void Table::updateLiveTransactionsAndStatus() {
2147
2148         // Go through each of the transactions
2149         for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2150                 Transaction *transaction = iter->next()->getValue();
2151
2152                 // Check if the transaction is dead
2153                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2154                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2155
2156                         // Set dead the transaction
2157                         transaction->setDead();
2158
2159                         // Remove the transaction from the live table
2160                         iter->remove();
2161                         liveTransactionByTransactionIdTable->remove(transaction->getId());
2162                 }
2163         }
2164
2165         // Go through each of the transactions
2166         for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2167                 TransactionStatus *status = iter->next()->getValue();
2168
2169                 // Check if the transaction is dead
2170                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2171                 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2172
2173                         // Set committed
2174                         status->setStatus(TransactionStatus_StatusCommitted);
2175
2176                         // Remove
2177                         iter->remove();
2178                 }
2179         }
2180 }
2181
2182 /**
2183  * Process this slot, entry by entry->  Also update the latest message sent by slot
2184  */
2185 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2186
2187         // Update the last message seen
2188         updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2189
2190         // Process each entry in the slot
2191         for (Entry *entry : slot->getEntries()) {
2192                 switch (entry->getType()) {
2193                 case TypeCommitPart:
2194                         processEntry((CommitPart *)entry);
2195                         break;
2196                 case TypeAbort:
2197                         processEntry((Abort *)entry);
2198                         break;
2199                 case TypeTransactionPart:
2200                         processEntry((TransactionPart *)entry);
2201                         break;
2202                 case TypeNewKey:
2203                         processEntry((NewKey *)entry);
2204                         break;
2205                 case TypeLastMessage:
2206                         processEntry((LastMessage *)entry, machineSet);
2207                         break;
2208                 case TypeRejectedMessage:
2209                         processEntry((RejectedMessage *)entry, indexer);
2210                         break;
2211                 case TypeTableStatus:
2212                         processEntry((TableStatus *)entry, slot->getSequenceNumber());
2213                         break;
2214                 default:
2215                         throw new Error("Unrecognized type: " + entry->getType());
2216                 }
2217         }
2218 }
2219
2220 /**
2221  * Update the last message that was sent for a machine Id
2222  */
2223 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2224         // Update what the last message received by a machine was
2225         updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2226 }
2227
2228 /**
2229  * Add the new key to the arbitrators table and update the set of live
2230  * new keys (in case of a rescued new key message)
2231  */
2232 void Table::processEntry(NewKey *entry) {
2233         // Update the arbitrator table with the new key information
2234         arbitratorTable->put(entry->getKey(), entry->getMachineID());
2235
2236         // Update what the latest live new key is
2237         NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2238         if (oldNewKey != NULL) {
2239                 // Delete the old new key messages
2240                 oldNewKey->setDead();
2241         }
2242 }
2243
2244 /**
2245  * Process new table status entries and set dead the old ones as new
2246  * ones come in-> keeps track of the largest and smallest table status
2247  * seen in this current round of updating the local copy of the block
2248  * chain
2249  */
2250 void Table::processEntry(TableStatus entry, int64_t seq) {
2251         int newNumSlots = entry->getMaxSlots();
2252         updateCurrMaxSize(newNumSlots);
2253         initExpectedSize(seq, newNumSlots);
2254
2255         if (liveTableStatus != NULL) {
2256                 // We have a larger table status so the old table status is no
2257                 // int64_ter alive
2258                 liveTableStatus->setDead();
2259         }
2260
2261         // Make this new table status the latest alive table status
2262         liveTableStatus = entry;
2263 }
2264
2265 /**
2266  * Check old messages to see if there is a block chain violation->
2267  * Also
2268  */
2269 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2270         int64_t oldSeqNum = entry->getOldSeqNum();
2271         int64_t newSeqNum = entry->getNewSeqNum();
2272         bool isequal = entry->getEqual();
2273         int64_t machineId = entry->getMachineID();
2274         int64_t seq = entry->getSequenceNumber();
2275
2276         // Check if we have messages that were supposed to be rejected in
2277         // our local block chain
2278         for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2279                 // Get the slot
2280                 Slot *slot = indexer->getSlot(seqNum);
2281
2282                 if (slot != NULL) {
2283                         // If we have this slot make sure that it was not supposed to be
2284                         // a rejected slot
2285                         int64_t slotMachineId = slot->getMachineID();
2286                         if (isequal != (slotMachineId == machineId)) {
2287                                 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2288                         }
2289                 }
2290         }
2291
2292         // Create a list of clients to watch until they see this rejected
2293         // message entry->
2294         Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2295         for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2296                 // Machine ID for the last message entry
2297                 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2298
2299                 // We've seen it, don't need to continue to watch->  Our next
2300                 // message will implicitly acknowledge it->
2301                 if (lastMessageEntryMachineId == localMachineId) {
2302                         continue;
2303                 }
2304
2305                 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2306                 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2307
2308                 if (entrySequenceNumber < seq) {
2309                         // Add this rejected message to the set of messages that this
2310                         // machine ID did not see yet
2311                         addWatchVector(lastMessageEntryMachineId, entry);
2312                         // This client did not see this rejected message yet so add it
2313                         // to the watch set to monitor
2314                         deviceWatchSet->add(lastMessageEntryMachineId);
2315                 }
2316         }
2317         if (deviceWatchSet->isEmpty()) {
2318                 // This rejected message has been seen by all the clients so
2319                 entry->setDead();
2320         } else {
2321                 // We need to watch this rejected message
2322                 entry->setWatchSet(deviceWatchSet);
2323         }
2324 }
2325
2326 /**
2327  * Check if this abort is live, if not then save it so we can kill it
2328  * later-> update the last transaction number that was arbitrated on->
2329  */
2330 void Table::processEntry(Abort *entry) {
2331         if (entry->getTransactionSequenceNumber() != -1) {
2332                 // update the transaction status if it was sent to the server
2333                 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2334                 if (status != NULL) {
2335                         status->setStatus(TransactionStatus_StatusAborted);
2336                 }
2337         }
2338
2339         // Abort has not been seen by the client it is for yet so we need to
2340         // keep track of it
2341         Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2342         if (previouslySeenAbort != NULL) {
2343                 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2344         }
2345
2346         if (entry->getTransactionArbitrator() == localMachineId) {
2347                 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2348         }
2349
2350         if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2351                 // The machine already saw this so it is dead
2352                 entry->setDead();
2353                 liveAbortTable->remove(entry->getAbortId());
2354
2355                 if (entry->getTransactionArbitrator() == localMachineId) {
2356                         liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2357                 }
2358                 return;
2359         }
2360
2361         // Update the last arbitration data that we have seen so far
2362         if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2363                 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2364                 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2365                         // Is larger
2366                         lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2367                 }
2368         } else {
2369                 // Never seen any data from this arbitrator so record the first one
2370                 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2371         }
2372
2373         // Set dead a transaction if we can
2374         Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2375         if (transactionToSetDead != NULL) {
2376                 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2377         }
2378
2379         // Update the last transaction sequence number that the arbitrator
2380         // arbitrated on
2381         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2382         if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2383                 // Is a valid one
2384                 if (entry->getTransactionSequenceNumber() != -1) {
2385                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2386                 }
2387         }
2388 }
2389
2390 /**
2391  * Set dead the transaction part if that transaction is dead and keep
2392  * track of all new parts
2393  */
2394 void Table::processEntry(TransactionPart *entry) {
2395         // Check if we have already seen this transaction and set it dead OR
2396         // if it is not alive
2397         int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2398         if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2399                 // This transaction is dead, it was already committed or aborted
2400                 entry->setDead();
2401                 return;
2402         }
2403
2404         // This part is still alive
2405         Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2406
2407         if (transactionPart == NULL) {
2408                 // Dont have a table for this machine Id yet so make one
2409                 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2410                 newTransactionParts->put(entry->getMachineId(), transactionPart);
2411         }
2412
2413         // Update the part and set dead ones we have already seen (got a
2414         // rescued version)
2415         TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2416         if (previouslySeenPart != NULL) {
2417                 previouslySeenPart->setDead();
2418         }
2419 }
2420
2421 /**
2422  * Process new commit entries and save them for future use->  Delete duplicates
2423  */
2424 void Table::processEntry(CommitPart *entry) {
2425         // Update the last transaction that was updated if we can
2426         if (entry->getTransactionSequenceNumber() != -1) {
2427                 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2428                 // Update the last transaction sequence number that the arbitrator
2429                 // arbitrated on
2430                 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2431                         lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2432                 }
2433         }
2434
2435         Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2436         if (commitPart == NULL) {
2437                 // Don't have a table for this machine Id yet so make one
2438                 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2439                 newCommitParts->put(entry->getMachineId(), commitPart);
2440         }
2441         // Update the part and set dead ones we have already seen (got a
2442         // rescued version)
2443         CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2444         if (previouslySeenPart != NULL) {
2445                 previouslySeenPart->setDead();
2446         }
2447 }
2448
2449 /**
2450  * Update the last message seen table-> Update and set dead the
2451  * appropriate RejectedMessages as clients see them-> Updates the live
2452  * aborts, removes those that are dead and sets them dead-> Check that
2453  * the last message seen is correct and that there is no mismatch of
2454  * our own last message or that other clients have not had a rollback
2455  * on the last message->
2456  */
2457 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2458         // We have seen this machine ID
2459         machineSet->remove(machineId);
2460
2461         // Get the set of rejected messages that this machine Id is has not seen yet
2462         Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2463         // If there is a rejected message that this machine Id has not seen yet
2464         if (watchset != NULL) {
2465                 // Go through each rejected message that this machine Id has not
2466                 // seen yet
2467                 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2468                         RejectedMessage *rm = rmit->next();
2469                         // If this machine Id has seen this rejected message->->->
2470                         if (rm->getSequenceNumber() <= seqNum) {
2471                                 // Remove it from our watchlist
2472                                 rmit->remove();
2473                                 // Decrement machines that need to see this notification
2474                                 rm->removeWatcher(machineId);
2475                         }
2476                 }
2477         }
2478
2479         // Set dead the abort
2480         for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2481                 Abort *abort = i->next()->getValue();
2482                 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2483                         abort->setDead();
2484                         i->remove();
2485                         if (abort->getTransactionArbitrator() == localMachineId) {
2486                                 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2487                         }
2488                 }
2489         }
2490         if (machineId == localMachineId) {
2491                 // Our own messages are immediately dead->
2492                 if (liveness instanceof LastMessage) {
2493                         ((LastMessage *)liveness)->setDead();
2494                 } else if (liveness instanceof Slot) {
2495                         ((Slot *)liveness)->setDead();
2496                 } else {
2497                         throw new Error("Unrecognized type");
2498                 }
2499         }
2500         // Get the old last message for this device
2501         Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2502         if (lastMessageEntry == NULL) {
2503                 // If no last message then there is nothing else to process
2504                 return;
2505         }
2506
2507         int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2508         Liveness *lastEntry = lastMessageEntry->getSecond();
2509
2510         // If it is not our machine Id since we already set ours to dead
2511         if (machineId != localMachineId) {
2512                 if (lastEntry instanceof LastMessage) {
2513                         ((LastMessage *)lastEntry)->setDead();
2514                 } else if (lastEntry instanceof Slot) {
2515                         ((Slot *)lastEntry)->setDead();
2516                 } else {
2517                         throw new Error("Unrecognized type");
2518                 }
2519         }
2520         // Make sure the server is not playing any games
2521         if (machineId == localMachineId) {
2522                 if (hadPartialSendToServer) {
2523                         // We were not making any updates and we had a machine mismatch
2524                         if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2525                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
2526                         }
2527                 } else {
2528                         // We were not making any updates and we had a machine mismatch
2529                         if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2530                                 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
2531                         }
2532                 }
2533         } else {
2534                 if (lastMessageSeqNum > seqNum) {
2535                         throw new Error("Server Error: Rollback on remote machine sequence number");
2536                 }
2537         }
2538 }
2539
2540 /**
2541  * Add a rejected message entry to the watch set to keep track of
2542  * which clients have seen that rejected message entry and which have
2543  * not.
2544  */
2545 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2546         Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2547         if (entries == NULL) {
2548                 // There is no set for this machine ID yet so create one
2549                 entries = new Hashset<RejectedMessage *>();
2550                 rejectedMessageWatchVectorTable->put(machineId, entries);
2551         }
2552         entries->add(entry);
2553 }
2554
2555 /**
2556  * Check if the HMAC chain is not violated
2557  */
2558 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2559         for (int i = 0; i < newSlots->length(); i++) {
2560                 Slot *currSlot = newSlots->get(i);
2561                 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2562                 if (prevSlot != NULL &&
2563                                 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2564                         throw new Error("Server Error: Invalid HMAC Chain");
2565         }
2566 }