3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localSequenceNumber(0),
112 localTransactionSequenceNumber(0),
113 lastTransactionSequenceNumberSpeculatedOn(0),
114 oldestTransactionSequenceNumberSpeculatedOn(0),
115 localArbitrationSequenceNumber(0),
116 hadPartialSendToServer(false),
117 attemptedToSendToServer(false),
119 didFindTableStatus(false),
121 lastSlotAttemptedToSend(NULL),
124 lastTransactionPartsSent(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
164 delete committedKeyValueTable;
165 delete speculatedKeyValueTable;
166 delete pendingTransactionSpeculatedKeyValueTable;
167 delete liveNewKeyTable;
169 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
170 while (lmit->hasNext()) {
171 Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
175 delete lastMessageTable;
177 if (pendingTransactionBuilder != NULL)
178 delete pendingTransactionBuilder;
180 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
181 while(rmit->hasNext()) {
182 int64_t machineid = rmit->next();
183 Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
184 SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
185 while (mit->hasNext()) {
186 RejectedMessage * rm = mit->next();
193 delete rejectedMessageWatchVectorTable;
195 delete arbitratorTable;
196 delete liveAbortTable;
198 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
199 while (partsit->hasNext()) {
200 int64_t machineId = partsit->next();
201 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
205 delete newTransactionParts;
208 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
209 while (partsit->hasNext()) {
210 int64_t machineId = partsit->next();
211 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
215 delete newCommitParts;
217 delete lastArbitratedTransactionNumberByArbitratorTable;
218 delete liveTransactionBySequenceNumberTable;
219 delete liveTransactionByTransactionIdTable;
221 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
222 while (liveit->hasNext()) {
223 int64_t arbitratorId = liveit->next();
225 // Get all the commits for a specific arbitrator
226 Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
228 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
229 while (clientit->hasNext()) {
230 int64_t id = clientit->next();
231 delete commitForClientTable->get(id);
236 delete commitForClientTable;
239 delete liveCommitsTable;
241 delete liveCommitsByKeyTable;
242 delete lastCommitSeenSequenceNumberByArbitratorTable;
243 delete rejectedSlotVector;
245 uint size = pendingTransactionQueue->size();
246 for (uint iter = 0; iter < size; iter++) {
247 delete pendingTransactionQueue->get(iter);
249 delete pendingTransactionQueue;
251 delete pendingSendArbitrationEntriesToDelete;
253 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
254 while (trit->hasNext()) {
255 Transaction *transaction = trit->next();
256 delete trit->currVal();
259 delete transactionPartsSent;
261 delete outstandingTransactionStatus;
262 delete liveAbortsGeneratedByLocal;
263 delete offlineTransactionsCommittedAndAtServer;
264 delete localCommunicationTable;
265 delete lastTransactionSeenFromMachineFromServer;
267 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
268 delete pendingSendArbitrationRounds->get(i);
270 delete pendingSendArbitrationRounds;
272 if (lastTransactionPartsSent != NULL)
273 delete lastTransactionPartsSent;
274 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
280 * Init all the stuff needed for for table usage
283 // Init helper objects
284 random = new SecureRandom();
285 buffer = new SlotBuffer();
288 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
289 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
290 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
291 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
292 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
293 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
294 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
295 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
296 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
297 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
298 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
299 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
300 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
301 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
302 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
303 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
304 rejectedSlotVector = new Vector<int64_t>();
305 pendingTransactionQueue = new Vector<Transaction *>();
306 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
307 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
308 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
309 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
310 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
311 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
312 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
313 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
314 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
317 numberOfSlots = buffer->capacity();
318 setResizeThreshold();
322 * Initialize the table by inserting a table status as the first entry
323 * into the table status also initialize the crypto stuff.
325 void Table::initTable() {
326 cloud->initSecurity();
328 // Create the first insertion into the block chain which is the table status
329 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
330 localSequenceNumber++;
331 TableStatus *status = new TableStatus(s, numberOfSlots);
332 s->addShallowEntry(status);
333 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
336 array = new Array<Slot *>(1);
338 // update local block chain
339 validateAndUpdate(array, true);
341 } else if (array->length() == 1) {
342 // in case we did push the slot BUT we failed to init it
343 validateAndUpdate(array, true);
349 throw new Error("Error on initialization");
354 * Rebuild the table from scratch by pulling the latest block chain
357 void Table::rebuild() {
358 // Just pull the latest slots from the server
359 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
360 validateAndUpdate(newslots, true);
363 updateLiveTransactionsAndStatus();
366 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
367 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
370 int64_t Table::getArbitrator(IoTString *key) {
371 return arbitratorTable->get(key);
374 void Table::close() {
378 IoTString *Table::getCommitted(IoTString *key) {
379 KeyValue *kv = committedKeyValueTable->get(key);
382 return new IoTString(kv->getValue());
388 IoTString *Table::getSpeculative(IoTString *key) {
389 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
392 kv = speculatedKeyValueTable->get(key);
396 kv = committedKeyValueTable->get(key);
400 return new IoTString(kv->getValue());
406 IoTString *Table::getCommittedAtomic(IoTString *key) {
407 KeyValue *kv = committedKeyValueTable->get(key);
409 if (!arbitratorTable->contains(key)) {
410 throw new Error("Key not Found.");
413 // Make sure new key value pair matches the current arbitrator
414 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
415 // TODO: Maybe not throw en error
416 throw new Error("Not all Key Values Match Arbitrator.");
420 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
421 return new IoTString(kv->getValue());
423 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
428 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
429 if (!arbitratorTable->contains(key)) {
430 throw new Error("Key not Found.");
433 // Make sure new key value pair matches the current arbitrator
434 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
435 // TODO: Maybe not throw en error
436 throw new Error("Not all Key Values Match Arbitrator.");
439 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
442 kv = speculatedKeyValueTable->get(key);
446 kv = committedKeyValueTable->get(key);
450 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
451 return new IoTString(kv->getValue());
453 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
458 bool Table::update() {
460 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
461 validateAndUpdate(newSlots, false);
464 updateLiveTransactionsAndStatus();
466 } catch (Exception *e) {
467 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
468 while (kit->hasNext()) {
469 int64_t m = kit->next();
478 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
480 if (arbitratorTable->contains(keyName)) {
481 // There is already an arbitrator
484 NewKey *newKey = new NewKey(NULL, keyName, machineId);
486 if (sendToServer(newKey)) {
487 // If successfully inserted
493 void Table::startTransaction() {
494 // Create a new transaction, invalidates any old pending transactions.
495 if (pendingTransactionBuilder != NULL)
496 delete pendingTransactionBuilder;
497 pendingTransactionBuilder = new PendingTransaction(localMachineId);
500 void Table::put(IoTString *key, IoTString *value) {
501 // Make sure it is a valid key
502 if (!arbitratorTable->contains(key)) {
503 throw new Error("Key not Found.");
506 // Make sure new key value pair matches the current arbitrator
507 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
508 // TODO: Maybe not throw en error
509 throw new Error("Not all Key Values Match Arbitrator.");
512 // Add the key value to this transaction
513 KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
514 pendingTransactionBuilder->addKV(kv);
517 TransactionStatus *Table::commitTransaction() {
518 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
519 // transaction with no updates will have no effect on the system
520 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
523 // Set the local transaction sequence number and increment
524 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
525 localTransactionSequenceNumber++;
527 // Create the transaction status
528 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
530 // Create the new transaction
531 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
532 newTransaction->setTransactionStatus(transactionStatus);
534 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
535 // Add it to the queue and invalidate the builder for safety
536 pendingTransactionQueue->add(newTransaction);
538 arbitrateOnLocalTransaction(newTransaction);
539 delete newTransaction;
540 updateLiveStateFromLocal();
542 if (pendingTransactionBuilder != NULL)
543 delete pendingTransactionBuilder;
545 pendingTransactionBuilder = new PendingTransaction(localMachineId);
549 } catch (ServerException *e) {
551 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
552 uint size = pendingTransactionQueue->size();
554 for (uint iter = 0; iter < size; iter++) {
555 Transaction *transaction = pendingTransactionQueue->get(iter);
556 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
558 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
559 // Already contacted this client so ignore all attempts to contact this client
560 // to preserve ordering for arbitrator
564 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
566 if (sendReturn.getFirst()) {
567 // Failed to contact over local
568 arbitratorTriedAndFailed->add(transaction->getArbitrator());
570 // Successful contact or should not contact
572 if (sendReturn.getSecond()) {
579 pendingTransactionQueue->setSize(oldindex);
582 updateLiveStateFromLocal();
584 return transactionStatus;
588 * Recalculate the new resize threshold
590 void Table::setResizeThreshold() {
591 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
592 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
595 int64_t Table::getLocalSequenceNumber() {
596 return localSequenceNumber;
599 void Table::processTransactionList(bool handlePartial) {
600 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
601 while (trit->hasNext()) {
602 Transaction *transaction = trit->next();
603 transaction->resetServerFailure();
604 // Update which transactions parts still need to be sent
605 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
606 // Add the transaction status to the outstanding list
607 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
609 // Update the transaction status
610 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
612 // Check if all the transaction parts were successfully
613 // sent and if so then remove it from pending
614 if (transaction->didSendAllParts()) {
615 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
616 pendingTransactionQueue->remove(transaction);
618 } else if (handlePartial) {
619 transaction->resetServerFailure();
620 // Set the transaction sequence number back to nothing
621 if (!transaction->didSendAPartToServer()) {
622 transaction->setSequenceNumber(-1);
629 NewKey * Table::handlePartialSend(NewKey * newKey) {
630 //Didn't receive acknowledgement for last send
631 //See if the server has received a newer slot
633 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
634 if (newSlots->length() == 0) {
635 //Retry sending old slot
636 bool wasInserted = false;
637 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
639 if (sendSlotsReturn) {
640 lastSlotAttemptedToSend = NULL;
641 if (newKey != NULL) {
642 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
647 processTransactionList(false);
649 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
650 if (newKey != NULL) {
651 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
656 processTransactionList(true);
660 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
661 while (trit->hasNext()) {
662 Transaction *transaction = trit->next();
663 transaction->resetServerFailure();
664 // Set the transaction sequence number back to nothing
665 if (!transaction->didSendAPartToServer()) {
666 transaction->setSequenceNumber(-1);
671 if (newSlots->length() != 0) {
672 // insert into the local block chain
673 validateAndUpdate(newSlots, true);
676 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
677 if (newKey != NULL) {
678 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
684 processTransactionList(true);
686 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
687 while (trit->hasNext()) {
688 Transaction *transaction = trit->next();
689 transaction->resetServerFailure();
690 // Set the transaction sequence number back to nothing
691 if (!transaction->didSendAPartToServer()) {
692 transaction->setSequenceNumber(-1);
698 // insert into the local block chain
699 validateAndUpdate(newSlots, true);
705 void Table::clearSentParts() {
706 // Clear the sent data since we are trying again
707 pendingSendArbitrationEntriesToDelete->clear();
708 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
709 while (trit->hasNext()) {
710 Transaction *transaction = trit->next();
711 delete trit->currVal();
714 transactionPartsSent->clear();
717 bool Table::sendToServer(NewKey *newKey) {
718 if (hadPartialSendToServer) {
719 newKey = handlePartialSend(newKey);
723 // While we have stuff that needs inserting into the block chain
724 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
725 if (hadPartialSendToServer) {
726 throw new Error("Should Be error free");
729 // If there is a new key with same name then end
730 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
736 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
737 localSequenceNumber++;
739 // Try to fill the slot with data
741 bool insertedNewKey = false;
742 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
745 // Reset which transaction to send
746 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
747 while (trit->hasNext()) {
748 Transaction *transaction = trit->next();
749 transaction->resetNextPartToSend();
751 // Set the transaction sequence number back to nothing
752 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
753 transaction->setSequenceNumber(-1);
758 // Clear the sent data since we are trying again
761 // We needed a resize so try again
762 fillSlot(slot, true, newKey, newSize, insertedNewKey);
764 if (lastSlotAttemptedToSend != NULL)
765 delete lastSlotAttemptedToSend;
767 lastSlotAttemptedToSend = slot;
768 lastIsNewKey = (newKey != NULL);
769 lastInsertedNewKey = insertedNewKey;
770 lastNewSize = newSize;
771 if (( newKey != lastNewKey) && (lastNewKey != NULL))
774 if (lastTransactionPartsSent != NULL)
775 delete lastTransactionPartsSent;
776 lastTransactionPartsSent = transactionPartsSent->clone();
778 Array<Slot *> * newSlots = NULL;
779 bool wasInserted = false;
780 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
782 if (sendSlotsReturn) {
783 lastSlotAttemptedToSend = NULL;
784 // Did insert into the block chain
785 if (insertedNewKey) {
786 // This slot was what was inserted not a previous slot
787 // New Key was successfully inserted into the block chain so dont want to insert it again
791 // Remove the aborts and commit parts that were sent from the pending to send queue
792 uint size = pendingSendArbitrationRounds->size();
794 for (uint i = 0; i < size; i++) {
795 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
796 round->removeParts(pendingSendArbitrationEntriesToDelete);
798 if (!round->isDoneSending()) {
800 pendingSendArbitrationRounds->set(oldcount++,
801 pendingSendArbitrationRounds->get(i));
803 delete pendingSendArbitrationRounds->get(i);
805 pendingSendArbitrationRounds->setSize(oldcount);
806 processTransactionList(false);
808 // Reset which transaction to send
809 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
810 while (trit->hasNext()) {
811 Transaction *transaction = trit->next();
812 transaction->resetNextPartToSend();
814 // Set the transaction sequence number back to nothing
815 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
816 transaction->setSequenceNumber(-1);
822 // Clear the sent data in preparation for next send
825 if (newSlots->length() != 0) {
826 // insert into the local block chain
827 validateAndUpdate(newSlots, true);
831 } catch (ServerException *e) {
832 if (e->getType() != ServerException_TypeInputTimeout) {
833 // Nothing was able to be sent to the server so just clear these data structures
834 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
835 while (trit->hasNext()) {
836 Transaction *transaction = trit->next();
837 transaction->resetNextPartToSend();
839 // Set the transaction sequence number back to nothing
840 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
841 transaction->setSequenceNumber(-1);
846 // There was a partial send to the server
847 hadPartialSendToServer = true;
849 // Nothing was able to be sent to the server so just clear these data structures
850 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
851 while (trit->hasNext()) {
852 Transaction *transaction = trit->next();
853 transaction->resetNextPartToSend();
854 transaction->setServerFailure();
864 return newKey == NULL;
867 bool Table::updateFromLocal(int64_t machineId) {
868 if (!localCommunicationTable->contains(machineId))
871 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
873 // Get the size of the send data
874 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
876 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
877 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
878 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
881 Array<char> *sendData = new Array<char>(sendDataSize);
882 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
885 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
889 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
890 localSequenceNumber++;
892 if (returnData == NULL) {
893 // Could not contact server
898 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
899 int numberOfEntries = bbDecode->getInt();
901 for (int i = 0; i < numberOfEntries; i++) {
902 char type = bbDecode->get();
903 if (type == TypeAbort) {
904 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
906 } else if (type == TypeCommitPart) {
907 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
908 processEntry(commitPart);
912 updateLiveStateFromLocal();
917 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
919 // Get the devices local communications
920 if (!localCommunicationTable->contains(transaction->getArbitrator()))
921 return Pair<bool, bool>(true, false);
923 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
925 // Get the size of the send data
926 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
928 Vector<TransactionPart *> *tParts = transaction->getParts();
929 uint tPartsSize = tParts->size();
930 for (uint i = 0; i < tPartsSize; i++) {
931 TransactionPart *part = tParts->get(i);
932 sendDataSize += part->getSize();
936 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
937 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
938 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
941 // Make the send data size
942 Array<char> *sendData = new Array<char>(sendDataSize);
943 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
946 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
947 bbEncode->putInt(transaction->getParts()->size());
949 Vector<TransactionPart *> *tParts = transaction->getParts();
950 uint tPartsSize = tParts->size();
951 for (uint i = 0; i < tPartsSize; i++) {
952 TransactionPart *part = tParts->get(i);
953 part->encode(bbEncode);
958 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
959 localSequenceNumber++;
961 if (returnData == NULL) {
962 // Could not contact server
963 return Pair<bool, bool>(true, false);
967 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
968 bool didCommit = bbDecode->get() == 1;
969 bool couldArbitrate = bbDecode->get() == 1;
970 int numberOfEntries = bbDecode->getInt();
971 bool foundAbort = false;
973 for (int i = 0; i < numberOfEntries; i++) {
974 char type = bbDecode->get();
975 if (type == TypeAbort) {
976 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
978 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
983 } else if (type == TypeCommitPart) {
984 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
985 processEntry(commitPart);
989 updateLiveStateFromLocal();
991 if (couldArbitrate) {
992 TransactionStatus *status = transaction->getTransactionStatus();
994 status->setStatus(TransactionStatus_StatusCommitted);
996 status->setStatus(TransactionStatus_StatusAborted);
999 TransactionStatus *status = transaction->getTransactionStatus();
1001 status->setStatus(TransactionStatus_StatusAborted);
1003 status->setStatus(TransactionStatus_StatusCommitted);
1007 return Pair<bool, bool>(false, true);
1010 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1012 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1013 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1014 int numberOfParts = bbDecode->getInt();
1016 // If we did commit a transaction or not
1017 bool didCommit = false;
1018 bool couldArbitrate = false;
1020 if (numberOfParts != 0) {
1022 // decode the transaction
1023 Transaction *transaction = new Transaction();
1024 for (int i = 0; i < numberOfParts; i++) {
1026 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1027 transaction->addPartDecode(newPart);
1030 // Arbitrate on transaction and pull relevant return data
1031 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1032 couldArbitrate = localArbitrateReturn.getFirst();
1033 didCommit = localArbitrateReturn.getSecond();
1035 updateLiveStateFromLocal();
1037 // Transaction was sent to the server so keep track of it to prevent double commit
1038 if (transaction->getSequenceNumber() != -1) {
1039 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1043 // The data to send back
1044 int returnDataSize = 0;
1045 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1047 // Get the aborts to send back
1048 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1050 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1051 while (abortit->hasNext())
1052 abortLocalSequenceNumbers->add(abortit->next());
1056 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1058 uint asize = abortLocalSequenceNumbers->size();
1059 for (uint i = 0; i < asize; i++) {
1060 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1061 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1065 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1066 unseenArbitrations->add(abort);
1067 returnDataSize += abort->getSize();
1070 // Get the commits to send back
1071 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1072 if (commitForClientTable != NULL) {
1073 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1075 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1076 while (commitit->hasNext())
1077 commitLocalSequenceNumbers->add(commitit->next());
1080 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1082 uint clsSize = commitLocalSequenceNumbers->size();
1083 for (uint clsi = 0; clsi < clsSize; clsi++) {
1084 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1085 Commit *commit = commitForClientTable->get(localSequenceNumber);
1087 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1092 Vector<CommitPart *> *parts = commit->getParts();
1093 uint nParts = parts->size();
1094 for (uint i = 0; i < nParts; i++) {
1095 CommitPart *commitPart = parts->get(i);
1096 unseenArbitrations->add(commitPart);
1097 returnDataSize += commitPart->getSize();
1103 // Number of arbitration entries to decode
1104 returnDataSize += 2 * sizeof(int32_t);
1106 // bool of did commit or not
1107 if (numberOfParts != 0) {
1108 returnDataSize += sizeof(char);
1111 // Data to send Back
1112 Array<char> *returnData = new Array<char>(returnDataSize);
1113 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1115 if (numberOfParts != 0) {
1117 bbEncode->put((char)1);
1119 bbEncode->put((char)0);
1121 if (couldArbitrate) {
1122 bbEncode->put((char)1);
1124 bbEncode->put((char)0);
1128 bbEncode->putInt(unseenArbitrations->size());
1129 uint size = unseenArbitrations->size();
1130 for (uint i = 0; i < size; i++) {
1131 Entry *entry = unseenArbitrations->get(i);
1132 entry->encode(bbEncode);
1135 localSequenceNumber++;
1139 /** Checks whether a given slot was sent using new slots in
1140 array. Returns true if sent and false otherwise. */
1142 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1143 uint size = array->length();
1144 for (uint i = 0; i < size; i++) {
1145 Slot *s = array->get(i);
1146 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1151 //Also need to see if other machines acknowledged our message
1152 for (uint i = 0; i < size; i++) {
1153 Slot *s = array->get(i);
1155 // Process each entry in the slot
1156 Vector<Entry *> *entries = s->getEntries();
1157 uint eSize = entries->size();
1158 for (uint ei = 0; ei < eSize; ei++) {
1159 Entry *entry = entries->get(ei);
1161 if (entry->getType() == TypeLastMessage) {
1162 LastMessage *lastMessage = (LastMessage *)entry;
1164 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1174 /** Method tries to send slot to server. Returns status in tuple.
1175 isInserted returns whether last un-acked send (if any) was
1176 successful. Returns whether send was confirmed.x
1179 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1180 attemptedToSendToServer = true;
1182 *array = cloud->putSlot(slot, newSize);
1183 if (*array == NULL) {
1184 *array = new Array<Slot *>(1);
1185 (*array)->set(0, slot);
1186 rejectedSlotVector->clear();
1187 *isInserted = false;
1190 if ((*array)->length() == 0) {
1191 throw new Error("Server Error: Did not send any slots");
1194 if (hadPartialSendToServer) {
1195 *isInserted = checkSend(*array, slot);
1197 if (!(*isInserted)) {
1198 rejectedSlotVector->add(slot->getSequenceNumber());
1203 rejectedSlotVector->add(slot->getSequenceNumber());
1204 *isInserted = false;
1211 * Returns true if a resize was needed but not done.
1213 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1214 newSize = 0;//special value to indicate no resize
1215 if (liveSlotCount > bufferResizeThreshold) {
1216 resize = true;//Resize is forced
1220 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1221 TableStatus *status = new TableStatus(slot, newSize);
1222 slot->addShallowEntry(status);
1225 // Fill with rejected slots first before doing anything else
1226 doRejectedMessages(slot);
1228 // Do mandatory rescue of entries
1229 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
1231 // Extract working variables
1232 bool needsResize = mandatoryRescueReturn.getFirst();
1233 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1234 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1236 if (needsResize && !resize) {
1237 // We need to resize but we are not resizing so return true to force on retry
1241 insertedKey = false;
1242 if (newKeyEntry != NULL) {
1243 newKeyEntry->setSlot(slot);
1244 if (slot->hasSpace(newKeyEntry)) {
1245 slot->addEntry(newKeyEntry);
1250 // Clear the transactions, aborts and commits that were sent previously
1252 uint size = pendingSendArbitrationRounds->size();
1253 for (uint i = 0; i < size; i++) {
1254 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1255 bool isFull = false;
1256 round->generateParts();
1257 Vector<Entry *> *parts = round->getParts();
1259 // Insert pending arbitration data
1260 uint vsize = parts->size();
1261 for (uint vi = 0; vi < vsize; vi++) {
1262 Entry *arbitrationData = parts->get(vi);
1264 // If it is an abort then we need to set some information
1265 if (arbitrationData->getType() == TypeAbort) {
1266 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1269 if (!slot->hasSpace(arbitrationData)) {
1270 // No space so cant do anything else with these data entries
1275 // Add to this current slot and add it to entries to delete
1276 slot->addEntry(arbitrationData);
1277 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1285 if (pendingTransactionQueue->size() > 0) {
1286 Transaction *transaction = pendingTransactionQueue->get(0);
1287 // Set the transaction sequence number if it has yet to be inserted into the block chain
1288 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1289 transaction->setSequenceNumber(slot->getSequenceNumber());
1293 TransactionPart *part = transaction->getNextPartToSend();
1295 // Ran out of parts to send for this transaction so move on
1299 if (slot->hasSpace(part)) {
1300 slot->addEntry(part);
1301 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1302 if (partsSent == NULL) {
1303 partsSent = new Vector<int32_t>();
1304 transactionPartsSent->put(transaction, partsSent);
1306 partsSent->add(part->getPartNumber());
1307 transactionPartsSent->put(transaction, partsSent);
1314 // Fill the remainder of the slot with rescue data
1315 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1320 void Table::doRejectedMessages(Slot *s) {
1321 if (!rejectedSlotVector->isEmpty()) {
1322 /* TODO: We should avoid generating a rejected message entry if
1323 * there is already a sufficient entry in the queue (e->g->,
1324 * equalsto value of true and same sequence number)-> */
1326 int64_t old_seqn = rejectedSlotVector->get(0);
1327 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1328 int64_t new_seqn = rejectedSlotVector->lastElement();
1329 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1330 s->addShallowEntry(rm);
1332 int64_t prev_seqn = -1;
1334 /* Go through list of missing messages */
1335 for (; i < rejectedSlotVector->size(); i++) {
1336 int64_t curr_seqn = rejectedSlotVector->get(i);
1337 Slot *s_msg = buffer->getSlot(curr_seqn);
1340 prev_seqn = curr_seqn;
1342 /* Generate rejected message entry for missing messages */
1343 if (prev_seqn != -1) {
1344 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1345 s->addShallowEntry(rm);
1347 /* Generate rejected message entries for present messages */
1348 for (; i < rejectedSlotVector->size(); i++) {
1349 int64_t curr_seqn = rejectedSlotVector->get(i);
1350 Slot *s_msg = buffer->getSlot(curr_seqn);
1351 int64_t machineid = s_msg->getMachineID();
1352 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1353 s->addShallowEntry(rm);
1359 ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
1360 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1361 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1362 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1363 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1366 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1367 bool seenLiveSlot = false;
1368 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1369 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1373 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1374 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1375 // Push slot number forward
1376 if (!seenLiveSlot) {
1377 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1380 if (!previousSlot->isLive()) {
1384 // We have seen a live slot
1385 seenLiveSlot = true;
1387 // Get all the live entries for a slot
1388 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1390 // Iterate over all the live entries and try to rescue them
1391 uint lESize = liveEntries->size();
1392 for (uint i = 0; i < lESize; i++) {
1393 Entry *liveEntry = liveEntries->get(i);
1394 if (slot->hasSpace(liveEntry)) {
1395 // Enough space to rescue the entry
1396 slot->addEntry(liveEntry);
1397 } else if (currentSequenceNumber == firstIfFull) {
1398 //if there's no space but the entry is about to fall off the queue
1399 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1405 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1408 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1409 /* now go through live entries from least to greatest sequence number until
1410 * either all live slots added, or the slot doesn't have enough room
1411 * for SKIP_THRESHOLD consecutive entries*/
1413 int64_t newestseqnum = buffer->getNewestSeqNum();
1414 for (; seqn <= newestseqnum; seqn++) {
1415 Slot *prevslot = buffer->getSlot(seqn);
1416 //Push slot number forward
1418 oldestLiveSlotSequenceNumver = seqn;
1420 if (!prevslot->isLive())
1422 seenliveslot = true;
1423 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1424 uint lESize = liveentries->size();
1425 for (uint i = 0; i < lESize; i++) {
1426 Entry *liveentry = liveentries->get(i);
1427 if (s->hasSpace(liveentry))
1428 s->addEntry(liveentry);
1431 if (skipcount > Table_SKIP_THRESHOLD) {
1444 * Checks for malicious activity and updates the local copy of the block chain->
1446 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1447 // The cloud communication layer has checked slot HMACs already
1449 if (newSlots->length() == 0) {
1453 // Make sure all slots are newer than the last largest slot this
1455 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1456 if (firstSeqNum <= sequenceNumber) {
1457 throw new Error("Server Error: Sent older slots!");
1460 // Create an object that can access both new slots and slots in our
1461 // local chain without committing slots to our local chain
1462 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1464 // Check that the HMAC chain is not broken
1465 checkHMACChain(indexer, newSlots);
1467 // Set to keep track of messages from clients
1468 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1470 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1471 while (lmit->hasNext())
1472 machineSet->add(lmit->next());
1476 // Process each slots data
1478 uint numSlots = newSlots->length();
1479 for (uint i = 0; i < numSlots; i++) {
1480 Slot *slot = newSlots->get(i);
1481 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1482 updateExpectedSize();
1487 // If there is a gap, check to see if the server sent us
1489 if (firstSeqNum != (sequenceNumber + 1)) {
1491 // Check the size of the slots that were sent down by the server->
1492 // Can only check the size if there was a gap
1493 checkNumSlots(newSlots->length());
1495 // Since there was a gap every machine must have pushed a slot or
1496 // must have a last message message-> If not then the server is
1498 if (!machineSet->isEmpty()) {
1500 throw new Error("Missing record for machines: ");
1504 // Update the size of our local block chain->
1507 // Commit new to slots to the local block chain->
1509 uint numSlots = newSlots->length();
1510 for (uint i = 0; i < numSlots; i++) {
1511 Slot *slot = newSlots->get(i);
1513 // Insert this slot into our local block chain copy->
1514 buffer->putSlot(slot);
1516 // Keep track of how many slots are currently live (have live data
1521 // Get the sequence number of the latest slot in the system
1522 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1523 updateLiveStateFromServer();
1525 // No Need to remember after we pulled from the server
1526 offlineTransactionsCommittedAndAtServer->clear();
1528 // This is invalidated now
1529 hadPartialSendToServer = false;
1532 void Table::updateLiveStateFromServer() {
1533 // Process the new transaction parts
1534 processNewTransactionParts();
1536 // Do arbitration on new transactions that were received
1537 arbitrateFromServer();
1539 // Update all the committed keys
1540 bool didCommitOrSpeculate = updateCommittedTable();
1542 // Delete the transactions that are now dead
1543 updateLiveTransactionsAndStatus();
1546 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1547 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1550 void Table::updateLiveStateFromLocal() {
1551 // Update all the committed keys
1552 bool didCommitOrSpeculate = updateCommittedTable();
1554 // Delete the transactions that are now dead
1555 updateLiveTransactionsAndStatus();
1558 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1559 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1562 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1563 int64_t prevslots = firstSequenceNumber;
1565 if (didFindTableStatus) {
1567 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1570 didFindTableStatus = true;
1571 currMaxSize = numberOfSlots;
1574 void Table::updateExpectedSize() {
1577 if (expectedsize > currMaxSize) {
1578 expectedsize = currMaxSize;
1584 * Check the size of the block chain to make sure there are enough
1585 * slots sent back by the server-> This is only called when we have a
1586 * gap between the slots that we have locally and the slots sent by
1587 * the server therefore in the slots sent by the server there will be
1588 * at least 1 Table status message
1590 void Table::checkNumSlots(int numberOfSlots) {
1591 if (numberOfSlots != expectedsize) {
1592 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1597 * Update the size of of the local buffer if it is needed->
1599 void Table::commitNewMaxSize() {
1600 didFindTableStatus = false;
1602 // Resize the local slot buffer
1603 if (numberOfSlots != currMaxSize) {
1604 buffer->resize((int32_t)currMaxSize);
1607 // Change the number of local slots to the new size
1608 numberOfSlots = (int32_t)currMaxSize;
1610 // Recalculate the resize threshold since the size of the local
1611 // buffer has changed
1612 setResizeThreshold();
1616 * Process the new transaction parts from this latest round of slots
1617 * received from the server
1619 void Table::processNewTransactionParts() {
1621 if (newTransactionParts->size() == 0) {
1622 // Nothing new to process
1626 // Iterate through all the machine Ids that we received new parts
1628 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1629 while (tpit->hasNext()) {
1630 int64_t machineId = tpit->next();
1631 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1633 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1634 // Iterate through all the parts for that machine Id
1635 while (ptit->hasNext()) {
1636 Pair<int64_t, int32_t> *partId = ptit->next();
1637 TransactionPart *part = parts->get(partId);
1639 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1640 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1641 if (lastTransactionNumber >= part->getSequenceNumber()) {
1642 // Set dead the transaction part
1648 // Get the transaction object for that sequence number
1649 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1651 if (transaction == NULL) {
1652 // This is a new transaction that we dont have so make a new one
1653 transaction = new Transaction();
1655 // Add that part to the transaction
1656 transaction->addPartDecode(part);
1658 // Insert this new transaction into the live tables
1659 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1660 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1666 // Clear all the new transaction parts in preparation for the next
1667 // time the server sends slots
1669 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1670 while (partsit->hasNext()) {
1671 int64_t machineId = partsit->next();
1672 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1676 newTransactionParts->clear();
1680 void Table::arbitrateFromServer() {
1681 if (liveTransactionBySequenceNumberTable->size() == 0) {
1682 // Nothing to arbitrate on so move on
1686 // Get the transaction sequence numbers and sort from oldest to newest
1687 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1689 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1690 while (trit->hasNext())
1691 transactionSequenceNumbers->add(trit->next());
1694 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1696 // Collection of key value pairs that are
1697 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1699 // The last transaction arbitrated on
1700 int64_t lastTransactionCommitted = -1;
1701 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1702 uint tsnSize = transactionSequenceNumbers->size();
1703 for (uint i = 0; i < tsnSize; i++) {
1704 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1705 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1707 // Check if this machine arbitrates for this transaction if not
1708 // then we cant arbitrate this transaction
1709 if (transaction->getArbitrator() != localMachineId) {
1713 if (transactionSequenceNumber < lastSeqNumArbOn) {
1717 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1718 // We have seen this already locally so dont commit again
1722 if (!transaction->isComplete()) {
1723 // Will arbitrate in incorrect order if we continue so just break
1728 // update the largest transaction seen by arbitrator from server
1729 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1730 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1732 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1733 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1734 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1738 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1739 // Guard evaluated as true
1740 // Update the local changes so we can make the commit
1741 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1742 while (kvit->hasNext()) {
1743 KeyValue *kv = kvit->next();
1744 speculativeTableTmp->put(kv->getKey(), kv);
1748 // Update what the last transaction committed was for use in batch commit
1749 lastTransactionCommitted = transactionSequenceNumber;
1751 // Guard evaluated was false so create abort
1753 Abort *newAbort = new Abort(NULL,
1754 transaction->getClientLocalSequenceNumber(),
1755 transaction->getSequenceNumber(),
1756 transaction->getMachineId(),
1757 transaction->getArbitrator(),
1758 localArbitrationSequenceNumber);
1759 localArbitrationSequenceNumber++;
1760 generatedAborts->add(newAbort);
1762 // Insert the abort so we can process
1763 processEntry(newAbort);
1766 lastSeqNumArbOn = transactionSequenceNumber;
1769 delete transactionSequenceNumbers;
1771 Commit *newCommit = NULL;
1773 // If there is something to commit
1774 if (speculativeTableTmp->size() != 0) {
1775 // Create the commit and increment the commit sequence number
1776 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1777 localArbitrationSequenceNumber++;
1779 // Add all the new keys to the commit
1780 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1781 while (spit->hasNext()) {
1782 IoTString *string = spit->next();
1783 KeyValue *kv = speculativeTableTmp->get(string);
1784 newCommit->addKV(kv);
1788 // create the commit parts
1789 newCommit->createCommitParts();
1791 // Append all the commit parts to the end of the pending queue
1792 // waiting for sending to the server
1793 // Insert the commit so we can process it
1794 Vector<CommitPart *> *parts = newCommit->getParts();
1795 uint partsSize = parts->size();
1796 for (uint i = 0; i < partsSize; i++) {
1797 CommitPart *commitPart = parts->get(i);
1798 processEntry(commitPart);
1801 delete speculativeTableTmp;
1803 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1804 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1805 pendingSendArbitrationRounds->add(arbitrationRound);
1807 if (compactArbitrationData()) {
1808 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1809 if (newArbitrationRound->getCommit() != NULL) {
1810 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1811 uint partsSize = parts->size();
1812 for (uint i = 0; i < partsSize; i++) {
1813 CommitPart *commitPart = parts->get(i);
1814 processEntry(commitPart);
1819 delete generatedAborts;
1823 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1825 // Check if this machine arbitrates for this transaction if not then
1826 // we cant arbitrate this transaction
1827 if (transaction->getArbitrator() != localMachineId) {
1828 return Pair<bool, bool>(false, false);
1831 if (!transaction->isComplete()) {
1832 // Will arbitrate in incorrect order if we continue so just break
1834 return Pair<bool, bool>(false, false);
1837 if (transaction->getMachineId() != localMachineId) {
1838 // dont do this check for local transactions
1839 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1840 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1841 // We've have already seen this from the server
1842 return Pair<bool, bool>(false, false);
1847 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1848 // Guard evaluated as true Create the commit and increment the
1849 // commit sequence number
1850 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1851 localArbitrationSequenceNumber++;
1853 // Update the local changes so we can make the commit
1854 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1855 while (kvit->hasNext()) {
1856 KeyValue *kv = kvit->next();
1857 newCommit->addKV(kv->getCopy());
1861 // create the commit parts
1862 newCommit->createCommitParts();
1864 // Append all the commit parts to the end of the pending queue
1865 // waiting for sending to the server
1866 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1867 pendingSendArbitrationRounds->add(arbitrationRound);
1869 if (compactArbitrationData()) {
1870 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1871 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1872 uint partsSize = parts->size();
1873 for (uint i = 0; i < partsSize; i++) {
1874 CommitPart *commitPart = parts->get(i);
1875 processEntry(commitPart);
1878 // Insert the commit so we can process it
1879 Vector<CommitPart *> *parts = newCommit->getParts();
1880 uint partsSize = parts->size();
1881 for (uint i = 0; i < partsSize; i++) {
1882 CommitPart *commitPart = parts->get(i);
1883 processEntry(commitPart);
1887 if (transaction->getMachineId() == localMachineId) {
1888 TransactionStatus *status = transaction->getTransactionStatus();
1889 if (status != NULL) {
1890 status->setStatus(TransactionStatus_StatusCommitted);
1894 updateLiveStateFromLocal();
1895 return Pair<bool, bool>(true, true);
1897 if (transaction->getMachineId() == localMachineId) {
1898 // For locally created messages update the status
1899 // Guard evaluated was false so create abort
1900 TransactionStatus *status = transaction->getTransactionStatus();
1901 if (status != NULL) {
1902 status->setStatus(TransactionStatus_StatusAborted);
1905 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1908 Abort *newAbort = new Abort(NULL,
1909 transaction->getClientLocalSequenceNumber(),
1911 transaction->getMachineId(),
1912 transaction->getArbitrator(),
1913 localArbitrationSequenceNumber);
1914 localArbitrationSequenceNumber++;
1915 addAbortSet->add(newAbort);
1917 // Append all the commit parts to the end of the pending queue
1918 // waiting for sending to the server
1919 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1920 pendingSendArbitrationRounds->add(arbitrationRound);
1922 if (compactArbitrationData()) {
1923 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1925 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1926 uint partsSize = parts->size();
1927 for (uint i = 0; i < partsSize; i++) {
1928 CommitPart *commitPart = parts->get(i);
1929 processEntry(commitPart);
1934 updateLiveStateFromLocal();
1935 return Pair<bool, bool>(true, false);
1940 * Compacts the arbitration data by merging commits and aggregating
1941 * aborts so that a single large push of commits can be done instead
1942 * of many small updates
1944 bool Table::compactArbitrationData() {
1945 if (pendingSendArbitrationRounds->size() < 2) {
1946 // Nothing to compact so do nothing
1950 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1951 if (lastRound->getDidSendPart()) {
1955 bool hadCommit = (lastRound->getCommit() == NULL);
1956 bool gotNewCommit = false;
1958 uint numberToDelete = 1;
1960 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1961 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1963 if (round->isFull() || round->getDidSendPart()) {
1964 // Stop since there is a part that cannot be compacted and we
1965 // need to compact in order
1969 if (round->getCommit() == NULL) {
1970 // Try compacting aborts only
1971 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1972 if (newSize > ArbitrationRound_MAX_PARTS) {
1973 // Cant compact since it would be too large
1976 lastRound->addAborts(round->getAborts());
1978 // Create a new larger commit
1979 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1980 localArbitrationSequenceNumber++;
1982 // Create the commit parts so that we can count them
1983 newCommit->createCommitParts();
1985 // Calculate the new size of the parts
1986 int newSize = newCommit->getNumberOfParts();
1987 newSize += lastRound->getAbortsCount();
1988 newSize += round->getAbortsCount();
1990 if (newSize > ArbitrationRound_MAX_PARTS) {
1991 // Can't compact since it would be too large
1992 if (lastRound->getCommit() != newCommit &&
1993 round->getCommit() != newCommit)
1997 // Set the new compacted part
1998 if (lastRound->getCommit() == newCommit)
1999 lastRound->setCommit(NULL);
2000 if (round->getCommit() == newCommit)
2001 round->setCommit(NULL);
2003 if (lastRound->getCommit() != NULL) {
2004 Commit * oldcommit = lastRound->getCommit();
2005 lastRound->setCommit(NULL);
2008 lastRound->setCommit(newCommit);
2009 lastRound->addAborts(round->getAborts());
2010 gotNewCommit = true;
2016 if (numberToDelete != 1) {
2017 // If there is a compaction
2018 // Delete the previous pieces that are now in the new compacted piece
2019 for (uint i = 2; i <= numberToDelete; i++) {
2020 delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2022 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2024 pendingSendArbitrationRounds->add(lastRound);
2026 // Should reinsert into the commit processor
2027 if (hadCommit && gotNewCommit) {
2036 * Update all the commits and the committed tables, sets dead the dead
2039 bool Table::updateCommittedTable() {
2040 if (newCommitParts->size() == 0) {
2041 // Nothing new to process
2045 // Iterate through all the machine Ids that we received new parts for
2046 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2047 while (partsit->hasNext()) {
2048 int64_t machineId = partsit->next();
2049 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2051 // Iterate through all the parts for that machine Id
2052 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2053 while (pairit->hasNext()) {
2054 Pair<int64_t, int32_t> *partId = pairit->next();
2055 CommitPart *part = parts->get(partId);
2057 // Get the transaction object for that sequence number
2058 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2060 if (commitForClientTable == NULL) {
2061 // This is the first commit from this device
2062 commitForClientTable = new Hashtable<int64_t, Commit *>();
2063 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2066 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2068 if (commit == NULL) {
2069 // This is a new commit that we dont have so make a new one
2070 commit = new Commit();
2072 // Insert this new commit into the live tables
2073 commitForClientTable->put(part->getSequenceNumber(), commit);
2076 // Add that part to the commit
2077 commit->addPartDecode(part);
2084 // Clear all the new commits parts in preparation for the next time
2085 // the server sends slots
2086 newCommitParts->clear();
2088 // If we process a new commit keep track of it for future use
2089 bool didProcessANewCommit = false;
2091 // Process the commits one by one
2092 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2093 while (liveit->hasNext()) {
2094 int64_t arbitratorId = liveit->next();
2095 // Get all the commits for a specific arbitrator
2096 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2098 // Sort the commits in order
2099 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2101 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2102 while (clientit->hasNext())
2103 commitSequenceNumbers->add(clientit->next());
2107 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2109 // Get the last commit seen from this arbitrator
2110 int64_t lastCommitSeenSequenceNumber = -1;
2111 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2112 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2115 // Go through each new commit one by one
2116 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2117 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2118 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2119 // Special processing if a commit is not complete
2120 if (!commit->isComplete()) {
2121 if (i == (commitSequenceNumbers->size() - 1)) {
2122 // If there is an incomplete commit and this commit is the
2123 // latest one seen then this commit cannot be processed and
2124 // there are no other commits
2127 // This is a commit that was already dead but parts of it
2128 // are still in the block chain (not flushed out yet)->
2129 // Delete it and move on
2131 commitForClientTable->remove(commit->getSequenceNumber());
2137 // Update the last transaction that was updated if we can
2138 if (commit->getTransactionSequenceNumber() != -1) {
2139 // Update the last transaction sequence number that the arbitrator arbitrated on1
2140 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2141 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2145 // Update the last arbitration data that we have seen so far
2146 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2147 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2148 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2150 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2153 // Never seen any data from this arbitrator so record the first one
2154 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2157 // We have already seen this commit before so need to do the
2158 // full processing on this commit
2159 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2160 // Update the last transaction that was updated if we can
2161 if (commit->getTransactionSequenceNumber() != -1) {
2162 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2163 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2164 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2165 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2171 // If we got here then this is a brand new commit and needs full
2173 // Get what commits should be edited, these are the commits that
2174 // have live values for their keys
2175 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2177 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2178 while (kvit->hasNext()) {
2179 KeyValue *kv = kvit->next();
2180 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2182 commitsToEdit->add(commit);
2187 // Update each previous commit that needs to be updated
2188 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2189 while (commitit->hasNext()) {
2190 Commit *previousCommit = commitit->next();
2192 // Only bother with live commits (TODO: Maybe remove this check)
2193 if (previousCommit->isLive()) {
2195 // Update which keys in the old commits are still live
2197 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2198 while (kvit->hasNext()) {
2199 KeyValue *kv = kvit->next();
2200 previousCommit->invalidateKey(kv->getKey());
2205 // if the commit is now dead then remove it
2206 if (!previousCommit->isLive()) {
2207 commitForClientTable->remove(previousCommit->getSequenceNumber());
2208 delete previousCommit;
2213 delete commitsToEdit;
2215 // Update the last seen sequence number from this arbitrator
2216 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2217 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2218 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2221 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2224 // We processed a new commit that we havent seen before
2225 didProcessANewCommit = true;
2227 // Update the committed table of keys and which commit is using which key
2229 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2230 while (kvit->hasNext()) {
2231 KeyValue *kv = kvit->next();
2232 committedKeyValueTable->put(kv->getKey(), kv);
2233 liveCommitsByKeyTable->put(kv->getKey(), commit);
2238 delete commitSequenceNumbers;
2242 return didProcessANewCommit;
2246 * Create the speculative table from transactions that are still live
2247 * and have come from the cloud
2249 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2250 if (liveTransactionBySequenceNumberTable->size() == 0) {
2251 // There is nothing to speculate on
2255 // Create a list of the transaction sequence numbers and sort them
2256 // from oldest to newest
2257 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2259 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2260 while (trit->hasNext())
2261 transactionSequenceNumbersSorted->add(trit->next());
2265 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2267 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2270 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2271 // If there is a gap in the transaction sequence numbers then
2272 // there was a commit or an abort of a transaction OR there was a
2273 // new commit (Could be from offline commit) so a redo the
2274 // speculation from scratch
2276 // Start from scratch
2277 speculatedKeyValueTable->clear();
2278 lastTransactionSequenceNumberSpeculatedOn = -1;
2279 oldestTransactionSequenceNumberSpeculatedOn = -1;
2282 // Remember the front of the transaction list
2283 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2285 // Find where to start arbitration from
2286 uint startIndex = 0;
2288 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2289 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2293 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2294 // Make sure we are not out of bounds
2295 delete transactionSequenceNumbersSorted;
2296 return false; // did not speculate
2299 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2300 bool didSkip = true;
2302 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2303 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2304 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2306 if (!transaction->isComplete()) {
2307 // If there is an incomplete transaction then there is nothing
2308 // we can do add this transactions arbitrator to the list of
2309 // arbitrators we should ignore
2310 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2315 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2319 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2321 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2322 // Guard evaluated to true so update the speculative table
2324 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2325 while (kvit->hasNext()) {
2326 KeyValue *kv = kvit->next();
2327 speculatedKeyValueTable->put(kv->getKey(), kv);
2334 delete transactionSequenceNumbersSorted;
2337 // Since there was a skip we need to redo the speculation next time around
2338 lastTransactionSequenceNumberSpeculatedOn = -1;
2339 oldestTransactionSequenceNumberSpeculatedOn = -1;
2342 // We did some speculation
2347 * Create the pending transaction speculative table from transactions
2348 * that are still in the pending transaction buffer
2350 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2351 if (pendingTransactionQueue->size() == 0) {
2352 // There is nothing to speculate on
2356 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2357 // need to reset on the pending speculation
2358 lastPendingTransactionSpeculatedOn = NULL;
2359 firstPendingTransaction = pendingTransactionQueue->get(0);
2360 pendingTransactionSpeculatedKeyValueTable->clear();
2363 // Find where to start arbitration from
2364 uint startIndex = 0;
2366 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2367 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2370 if (startIndex >= pendingTransactionQueue->size()) {
2371 // Make sure we are not out of bounds
2375 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2376 Transaction *transaction = pendingTransactionQueue->get(i);
2378 lastPendingTransactionSpeculatedOn = transaction;
2380 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2381 // Guard evaluated to true so update the speculative table
2382 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2383 while (kvit->hasNext()) {
2384 KeyValue *kv = kvit->next();
2385 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2393 * Set dead and remove from the live transaction tables the
2394 * transactions that are dead
2396 void Table::updateLiveTransactionsAndStatus() {
2397 // Go through each of the transactions
2399 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2400 while (iter->hasNext()) {
2401 int64_t key = iter->next();
2402 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2404 // Check if the transaction is dead
2405 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2406 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2407 // Set dead the transaction
2408 transaction->setDead();
2410 // Remove the transaction from the live table
2412 liveTransactionByTransactionIdTable->remove(transaction->getId());
2419 // Go through each of the transactions
2421 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2422 while (iter->hasNext()) {
2423 int64_t key = iter->next();
2424 TransactionStatus *status = outstandingTransactionStatus->get(key);
2426 // Check if the transaction is dead
2427 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2428 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2430 status->setStatus(TransactionStatus_StatusCommitted);
2441 * Process this slot, entry by entry-> Also update the latest message sent by slot
2443 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2445 // Update the last message seen
2446 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2448 // Process each entry in the slot
2449 Vector<Entry *> *entries = slot->getEntries();
2450 uint eSize = entries->size();
2451 for (uint ei = 0; ei < eSize; ei++) {
2452 Entry *entry = entries->get(ei);
2453 switch (entry->getType()) {
2454 case TypeCommitPart:
2455 processEntry((CommitPart *)entry);
2458 processEntry((Abort *)entry);
2460 case TypeTransactionPart:
2461 processEntry((TransactionPart *)entry);
2464 processEntry((NewKey *)entry);
2466 case TypeLastMessage:
2467 processEntry((LastMessage *)entry, machineSet);
2469 case TypeRejectedMessage:
2470 processEntry((RejectedMessage *)entry, indexer);
2472 case TypeTableStatus:
2473 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2476 throw new Error("Unrecognized type: ");
2482 * Update the last message that was sent for a machine Id
2484 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2485 // Update what the last message received by a machine was
2486 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2490 * Add the new key to the arbitrators table and update the set of live
2491 * new keys (in case of a rescued new key message)
2493 void Table::processEntry(NewKey *entry) {
2494 // Update the arbitrator table with the new key information
2495 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2497 // Update what the latest live new key is
2498 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2499 if (oldNewKey != NULL) {
2500 // Delete the old new key messages
2501 oldNewKey->setDead();
2506 * Process new table status entries and set dead the old ones as new
2507 * ones come in-> keeps track of the largest and smallest table status
2508 * seen in this current round of updating the local copy of the block
2511 void Table::processEntry(TableStatus *entry, int64_t seq) {
2512 int newNumSlots = entry->getMaxSlots();
2513 updateCurrMaxSize(newNumSlots);
2514 initExpectedSize(seq, newNumSlots);
2516 if (liveTableStatus != NULL) {
2517 // We have a larger table status so the old table status is no
2519 liveTableStatus->setDead();
2522 // Make this new table status the latest alive table status
2523 liveTableStatus = entry;
2527 * Check old messages to see if there is a block chain violation->
2530 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2531 int64_t oldSeqNum = entry->getOldSeqNum();
2532 int64_t newSeqNum = entry->getNewSeqNum();
2533 bool isequal = entry->getEqual();
2534 int64_t machineId = entry->getMachineID();
2535 int64_t seq = entry->getSequenceNumber();
2537 // Check if we have messages that were supposed to be rejected in
2538 // our local block chain
2539 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2541 Slot *slot = indexer->getSlot(seqNum);
2544 // If we have this slot make sure that it was not supposed to be
2546 int64_t slotMachineId = slot->getMachineID();
2547 if (isequal != (slotMachineId == machineId)) {
2548 throw new Error("Server Error: Trying to insert rejected message for slot ");
2553 // Create a list of clients to watch until they see this rejected
2555 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2556 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2557 while (iter->hasNext()) {
2558 // Machine ID for the last message entry
2559 int64_t lastMessageEntryMachineId = iter->next();
2561 // We've seen it, don't need to continue to watch-> Our next
2562 // message will implicitly acknowledge it->
2563 if (lastMessageEntryMachineId == localMachineId) {
2567 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2568 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2570 if (entrySequenceNumber < seq) {
2571 // Add this rejected message to the set of messages that this
2572 // machine ID did not see yet
2573 addWatchVector(lastMessageEntryMachineId, entry);
2574 // This client did not see this rejected message yet so add it
2575 // to the watch set to monitor
2576 deviceWatchSet->add(lastMessageEntryMachineId);
2581 if (deviceWatchSet->isEmpty()) {
2582 // This rejected message has been seen by all the clients so
2584 delete deviceWatchSet;
2586 // We need to watch this rejected message
2587 entry->setWatchSet(deviceWatchSet);
2592 * Check if this abort is live, if not then save it so we can kill it
2593 * later-> update the last transaction number that was arbitrated on->
2595 void Table::processEntry(Abort *entry) {
2596 if (entry->getTransactionSequenceNumber() != -1) {
2597 // update the transaction status if it was sent to the server
2598 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2599 if (status != NULL) {
2600 status->setStatus(TransactionStatus_StatusAborted);
2604 // Abort has not been seen by the client it is for yet so we need to
2607 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2608 if (previouslySeenAbort != NULL) {
2609 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2612 if (entry->getTransactionArbitrator() == localMachineId) {
2613 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2616 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2617 // The machine already saw this so it is dead
2619 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2620 liveAbortTable->remove(&abortid);
2622 if (entry->getTransactionArbitrator() == localMachineId) {
2623 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2628 // Update the last arbitration data that we have seen so far
2629 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2630 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2631 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2633 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2636 // Never seen any data from this arbitrator so record the first one
2637 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2640 // Set dead a transaction if we can
2641 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2643 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2644 if (transactionToSetDead != NULL) {
2645 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2648 // Update the last transaction sequence number that the arbitrator
2650 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2651 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2653 if (entry->getTransactionSequenceNumber() != -1) {
2654 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2660 * Set dead the transaction part if that transaction is dead and keep
2661 * track of all new parts
2663 void Table::processEntry(TransactionPart *entry) {
2664 // Check if we have already seen this transaction and set it dead OR
2665 // if it is not alive
2666 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2667 // This transaction is dead, it was already committed or aborted
2672 // This part is still alive
2673 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2675 if (transactionPart == NULL) {
2676 // Dont have a table for this machine Id yet so make one
2677 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2678 newTransactionParts->put(entry->getMachineId(), transactionPart);
2681 // Update the part and set dead ones we have already seen (got a
2683 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2684 if (previouslySeenPart != NULL) {
2685 previouslySeenPart->setDead();
2690 * Process new commit entries and save them for future use-> Delete duplicates
2692 void Table::processEntry(CommitPart *entry) {
2693 // Update the last transaction that was updated if we can
2694 if (entry->getTransactionSequenceNumber() != -1) {
2695 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2696 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2697 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2701 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2702 if (commitPart == NULL) {
2703 // Don't have a table for this machine Id yet so make one
2704 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2705 newCommitParts->put(entry->getMachineId(), commitPart);
2707 // Update the part and set dead ones we have already seen (got a
2709 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2710 if (previouslySeenPart != NULL) {
2711 previouslySeenPart->setDead();
2716 * Update the last message seen table-> Update and set dead the
2717 * appropriate RejectedMessages as clients see them-> Updates the live
2718 * aborts, removes those that are dead and sets them dead-> Check that
2719 * the last message seen is correct and that there is no mismatch of
2720 * our own last message or that other clients have not had a rollback
2721 * on the last message->
2723 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2724 // We have seen this machine ID
2725 machineSet->remove(machineId);
2727 // Get the set of rejected messages that this machine Id is has not seen yet
2728 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2729 // If there is a rejected message that this machine Id has not seen yet
2730 if (watchset != NULL) {
2731 // Go through each rejected message that this machine Id has not
2734 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2735 while (rmit->hasNext()) {
2736 RejectedMessage *rm = rmit->next();
2737 // If this machine Id has seen this rejected message->->->
2738 if (rm->getSequenceNumber() <= seqNum) {
2739 // Remove it from our watchlist
2741 // Decrement machines that need to see this notification
2742 rm->removeWatcher(machineId);
2748 // Set dead the abort
2749 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2751 while (abortit->hasNext()) {
2752 Pair<int64_t, int64_t> *key = abortit->next();
2753 Abort *abort = liveAbortTable->get(key);
2754 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2757 if (abort->getTransactionArbitrator() == localMachineId) {
2758 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2763 if (machineId == localMachineId) {
2764 // Our own messages are immediately dead->
2765 char livenessType = liveness->getType();
2766 if (livenessType == TypeLastMessage) {
2767 ((LastMessage *)liveness)->setDead();
2768 } else if (livenessType == TypeSlot) {
2769 ((Slot *)liveness)->setDead();
2771 throw new Error("Unrecognized type");
2774 // Get the old last message for this device
2775 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2776 if (lastMessageEntry == NULL) {
2777 // If no last message then there is nothing else to process
2781 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2782 Liveness *lastEntry = lastMessageEntry->getSecond();
2783 delete lastMessageEntry;
2785 // If it is not our machine Id since we already set ours to dead
2786 if (machineId != localMachineId) {
2787 char lastEntryType = lastEntry->getType();
2789 if (lastEntryType == TypeLastMessage) {
2790 ((LastMessage *)lastEntry)->setDead();
2791 } else if (lastEntryType == TypeSlot) {
2792 ((Slot *)lastEntry)->setDead();
2794 throw new Error("Unrecognized type");
2797 // Make sure the server is not playing any games
2798 if (machineId == localMachineId) {
2799 if (hadPartialSendToServer) {
2800 // We were not making any updates and we had a machine mismatch
2801 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2802 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2805 // We were not making any updates and we had a machine mismatch
2806 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2807 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2811 if (lastMessageSeqNum > seqNum) {
2812 throw new Error("Server Error: Rollback on remote machine sequence number");
2818 * Add a rejected message entry to the watch set to keep track of
2819 * which clients have seen that rejected message entry and which have
2822 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2823 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2824 if (entries == NULL) {
2825 // There is no set for this machine ID yet so create one
2826 entries = new Hashset<RejectedMessage *>();
2827 rejectedMessageWatchVectorTable->put(machineId, entries);
2829 entries->add(entry);
2833 * Check if the HMAC chain is not violated
2835 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2836 for (uint i = 0; i < newSlots->length(); i++) {
2837 Slot *currSlot = newSlots->get(i);
2838 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2839 if (prevSlot != NULL &&
2840 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2841 throw new Error("Server Error: Invalid HMAC Chain");