3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
63 lastPendingSendArbitrationEntriesToDelete(NULL),
65 committedKeyValueTable(NULL),
66 speculatedKeyValueTable(NULL),
67 pendingTransactionSpeculatedKeyValueTable(NULL),
68 liveNewKeyTable(NULL),
69 lastMessageTable(NULL),
70 rejectedMessageWatchVectorTable(NULL),
71 arbitratorTable(NULL),
73 newTransactionParts(NULL),
75 lastArbitratedTransactionNumberByArbitratorTable(NULL),
76 liveTransactionBySequenceNumberTable(NULL),
77 liveTransactionByTransactionIdTable(NULL),
78 liveCommitsTable(NULL),
79 liveCommitsByKeyTable(NULL),
80 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81 rejectedSlotVector(NULL),
82 pendingTransactionQueue(NULL),
83 pendingSendArbitrationRounds(NULL),
84 pendingSendArbitrationEntriesToDelete(NULL),
85 transactionPartsSent(NULL),
86 outstandingTransactionStatus(NULL),
87 liveAbortsGeneratedByLocal(NULL),
88 offlineTransactionsCommittedAndAtServer(NULL),
89 localCommunicationTable(NULL),
90 lastTransactionSeenFromMachineFromServer(NULL),
91 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92 lastInsertedNewKey(false),
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
102 liveTableStatus(NULL),
103 pendingTransactionBuilder(NULL),
104 lastPendingTransactionSpeculatedOn(NULL),
105 firstPendingTransaction(NULL),
107 bufferResizeThreshold(0),
109 oldestLiveSlotSequenceNumver(1),
110 localMachineId(_localMachineId),
112 localSequenceNumber(0),
113 localTransactionSequenceNumber(0),
114 lastTransactionSequenceNumberSpeculatedOn(0),
115 oldestTransactionSequenceNumberSpeculatedOn(0),
116 localArbitrationSequenceNumber(0),
117 hadPartialSendToServer(false),
118 attemptedToSendToServer(false),
120 didFindTableStatus(false),
122 lastSlotAttemptedToSend(NULL),
125 lastTransactionPartsSent(NULL),
126 lastPendingSendArbitrationEntriesToDelete(NULL),
128 committedKeyValueTable(NULL),
129 speculatedKeyValueTable(NULL),
130 pendingTransactionSpeculatedKeyValueTable(NULL),
131 liveNewKeyTable(NULL),
132 lastMessageTable(NULL),
133 rejectedMessageWatchVectorTable(NULL),
134 arbitratorTable(NULL),
135 liveAbortTable(NULL),
136 newTransactionParts(NULL),
137 newCommitParts(NULL),
138 lastArbitratedTransactionNumberByArbitratorTable(NULL),
139 liveTransactionBySequenceNumberTable(NULL),
140 liveTransactionByTransactionIdTable(NULL),
141 liveCommitsTable(NULL),
142 liveCommitsByKeyTable(NULL),
143 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144 rejectedSlotVector(NULL),
145 pendingTransactionQueue(NULL),
146 pendingSendArbitrationRounds(NULL),
147 pendingSendArbitrationEntriesToDelete(NULL),
148 transactionPartsSent(NULL),
149 outstandingTransactionStatus(NULL),
150 liveAbortsGeneratedByLocal(NULL),
151 offlineTransactionsCommittedAndAtServer(NULL),
152 localCommunicationTable(NULL),
153 lastTransactionSeenFromMachineFromServer(NULL),
154 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155 lastInsertedNewKey(false),
166 delete committedKeyValueTable;
167 delete speculatedKeyValueTable;
168 delete pendingTransactionSpeculatedKeyValueTable;
169 delete liveNewKeyTable;
170 delete lastMessageTable;
171 delete rejectedMessageWatchVectorTable;
172 delete arbitratorTable;
173 delete liveAbortTable;
174 delete newTransactionParts;
175 delete newCommitParts;
176 delete lastArbitratedTransactionNumberByArbitratorTable;
177 delete liveTransactionBySequenceNumberTable;
178 delete liveTransactionByTransactionIdTable;
179 delete liveCommitsTable;
180 delete liveCommitsByKeyTable;
181 delete lastCommitSeenSequenceNumberByArbitratorTable;
182 delete rejectedSlotVector;
183 delete pendingTransactionQueue;
184 delete pendingSendArbitrationEntriesToDelete;
185 delete transactionPartsSent;
186 delete outstandingTransactionStatus;
187 delete liveAbortsGeneratedByLocal;
188 delete offlineTransactionsCommittedAndAtServer;
189 delete localCommunicationTable;
190 delete lastTransactionSeenFromMachineFromServer;
191 delete pendingSendArbitrationRounds;
192 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
196 * Init all the stuff needed for for table usage
199 // Init helper objects
200 random = new SecureRandom();
201 buffer = new SlotBuffer();
204 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
205 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
206 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
207 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
208 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
209 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
210 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
211 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
212 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
213 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
214 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
215 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
216 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
217 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
218 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
219 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
220 rejectedSlotVector = new Vector<int64_t>();
221 pendingTransactionQueue = new Vector<Transaction *>();
222 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
223 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
224 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
225 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
226 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
227 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
228 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
229 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
230 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
233 numberOfSlots = buffer->capacity();
234 setResizeThreshold();
238 * Initialize the table by inserting a table status as the first entry
239 * into the table status also initialize the crypto stuff.
241 void Table::initTable() {
242 cloud->initSecurity();
244 // Create the first insertion into the block chain which is the table status
245 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
246 localSequenceNumber++;
247 TableStatus *status = new TableStatus(s, numberOfSlots);
249 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
252 array = new Array<Slot *>(1);
254 // update local block chain
255 validateAndUpdate(array, true);
256 } else if (array->length() == 1) {
257 // in case we did push the slot BUT we failed to init it
258 validateAndUpdate(array, true);
260 throw new Error("Error on initialization");
265 * Rebuild the table from scratch by pulling the latest block chain
268 void Table::rebuild() {
269 // Just pull the latest slots from the server
270 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
271 validateAndUpdate(newslots, true);
273 updateLiveTransactionsAndStatus();
276 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
277 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
280 int64_t Table::getArbitrator(IoTString *key) {
281 return arbitratorTable->get(key);
284 void Table::close() {
288 IoTString *Table::getCommitted(IoTString *key) {
289 KeyValue *kv = committedKeyValueTable->get(key);
292 return kv->getValue();
298 IoTString *Table::getSpeculative(IoTString *key) {
299 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
302 kv = speculatedKeyValueTable->get(key);
306 kv = committedKeyValueTable->get(key);
310 return kv->getValue();
316 IoTString *Table::getCommittedAtomic(IoTString *key) {
317 KeyValue *kv = committedKeyValueTable->get(key);
319 if (!arbitratorTable->contains(key)) {
320 throw new Error("Key not Found.");
323 // Make sure new key value pair matches the current arbitrator
324 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
325 // TODO: Maybe not throw en error
326 throw new Error("Not all Key Values Match Arbitrator.");
330 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
331 return kv->getValue();
333 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
338 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
339 if (!arbitratorTable->contains(key)) {
340 throw new Error("Key not Found.");
343 // Make sure new key value pair matches the current arbitrator
344 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
345 // TODO: Maybe not throw en error
346 throw new Error("Not all Key Values Match Arbitrator.");
349 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
352 kv = speculatedKeyValueTable->get(key);
356 kv = committedKeyValueTable->get(key);
360 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
361 return kv->getValue();
363 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
368 bool Table::update() {
370 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
371 validateAndUpdate(newSlots, false);
373 updateLiveTransactionsAndStatus();
375 } catch (Exception *e) {
376 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
377 while (kit->hasNext()) {
378 int64_t m = kit->next();
387 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
389 if (arbitratorTable->contains(keyName)) {
390 // There is already an arbitrator
393 NewKey *newKey = new NewKey(NULL, keyName, machineId);
395 if (sendToServer(newKey)) {
396 // If successfully inserted
402 void Table::startTransaction() {
403 // Create a new transaction, invalidates any old pending transactions.
404 pendingTransactionBuilder = new PendingTransaction(localMachineId);
407 void Table::addKV(IoTString *key, IoTString *value) {
409 // Make sure it is a valid key
410 if (!arbitratorTable->contains(key)) {
411 throw new Error("Key not Found.");
414 // Make sure new key value pair matches the current arbitrator
415 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
416 // TODO: Maybe not throw en error
417 throw new Error("Not all Key Values Match Arbitrator.");
420 // Add the key value to this transaction
421 KeyValue *kv = new KeyValue(key, value);
422 pendingTransactionBuilder->addKV(kv);
425 TransactionStatus *Table::commitTransaction() {
426 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
427 // transaction with no updates will have no effect on the system
428 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
431 // Set the local transaction sequence number and increment
432 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
433 localTransactionSequenceNumber++;
435 // Create the transaction status
436 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
438 // Create the new transaction
439 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
440 newTransaction->setTransactionStatus(transactionStatus);
442 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
443 // Add it to the queue and invalidate the builder for safety
444 pendingTransactionQueue->add(newTransaction);
446 arbitrateOnLocalTransaction(newTransaction);
447 updateLiveStateFromLocal();
450 pendingTransactionBuilder = new PendingTransaction(localMachineId);
454 } catch (ServerException *e) {
456 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
457 uint size = pendingTransactionQueue->size();
459 for (uint iter = 0; iter < size; iter++) {
460 Transaction *transaction = pendingTransactionQueue->get(iter);
461 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
463 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
464 // Already contacted this client so ignore all attempts to contact this client
465 // to preserve ordering for arbitrator
469 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
471 if (sendReturn.getFirst()) {
472 // Failed to contact over local
473 arbitratorTriedAndFailed->add(transaction->getArbitrator());
475 // Successful contact or should not contact
477 if (sendReturn.getSecond()) {
483 pendingTransactionQueue->setSize(oldindex);
486 updateLiveStateFromLocal();
488 return transactionStatus;
492 * Recalculate the new resize threshold
494 void Table::setResizeThreshold() {
495 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
496 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
499 int64_t Table::getLocalSequenceNumber() {
500 return localSequenceNumber;
503 bool Table::sendToServer(NewKey *newKey) {
504 bool fromRetry = false;
506 if (hadPartialSendToServer) {
507 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
508 if (newSlots->length() == 0) {
510 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
512 if (sendSlotsReturn.getFirst()) {
513 if (newKey != NULL) {
514 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
519 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
520 while (trit->hasNext()) {
521 Transaction *transaction = trit->next();
522 transaction->resetServerFailure();
523 // Update which transactions parts still need to be sent
524 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
525 // Add the transaction status to the outstanding list
526 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
528 // Update the transaction status
529 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
531 // Check if all the transaction parts were successfully
532 // sent and if so then remove it from pending
533 if (transaction->didSendAllParts()) {
534 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
535 pendingTransactionQueue->remove(transaction);
540 newSlots = sendSlotsReturn.getThird();
541 bool isInserted = false;
542 for (uint si = 0; si < newSlots->length(); si++) {
543 Slot *s = newSlots->get(si);
544 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
550 for (uint si = 0; si < newSlots->length(); si++) {
551 Slot *s = newSlots->get(si);
556 // Process each entry in the slot
557 Vector<Entry *> *ventries = s->getEntries();
558 uint vesize = ventries->size();
559 for (uint vei = 0; vei < vesize; vei++) {
560 Entry *entry = ventries->get(vei);
561 if (entry->getType() == TypeLastMessage) {
562 LastMessage *lastMessage = (LastMessage *)entry;
563 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
572 if (newKey != NULL) {
573 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
578 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
579 while (trit->hasNext()) {
580 Transaction *transaction = trit->next();
581 transaction->resetServerFailure();
583 // Update which transactions parts still need to be sent
584 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
586 // Add the transaction status to the outstanding list
587 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
589 // Update the transaction status
590 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
592 // Check if all the transaction parts were successfully sent and if so then remove it from pending
593 if (transaction->didSendAllParts()) {
594 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
595 pendingTransactionQueue->remove(transaction);
597 transaction->resetServerFailure();
598 // Set the transaction sequence number back to nothing
599 if (!transaction->didSendAPartToServer()) {
600 transaction->setSequenceNumber(-1);
608 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
609 while (trit->hasNext()) {
610 Transaction *transaction = trit->next();
611 transaction->resetServerFailure();
612 // Set the transaction sequence number back to nothing
613 if (!transaction->didSendAPartToServer()) {
614 transaction->setSequenceNumber(-1);
619 if (sendSlotsReturn.getThird()->length() != 0) {
620 // insert into the local block chain
621 validateAndUpdate(sendSlotsReturn.getThird(), true);
625 bool isInserted = false;
626 for (uint si = 0; si < newSlots->length(); si++) {
627 Slot *s = newSlots->get(si);
628 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
634 for (uint si = 0; si < newSlots->length(); si++) {
635 Slot *s = newSlots->get(si);
640 // Process each entry in the slot
641 Vector<Entry *> *entries = s->getEntries();
642 uint eSize = entries->size();
643 for (uint ei = 0; ei < eSize; ei++) {
644 Entry *entry = entries->get(ei);
646 if (entry->getType() == TypeLastMessage) {
647 LastMessage *lastMessage = (LastMessage *)entry;
648 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
657 if (newKey != NULL) {
658 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
663 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664 while (trit->hasNext()) {
665 Transaction *transaction = trit->next();
666 transaction->resetServerFailure();
668 // Update which transactions parts still need to be sent
669 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
671 // Add the transaction status to the outstanding list
672 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
674 // Update the transaction status
675 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
677 // Check if all the transaction parts were successfully sent and if so then remove it from pending
678 if (transaction->didSendAllParts()) {
679 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
680 pendingTransactionQueue->remove(transaction);
682 transaction->resetServerFailure();
683 // Set the transaction sequence number back to nothing
684 if (!transaction->didSendAPartToServer()) {
685 transaction->setSequenceNumber(-1);
691 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
692 while (trit->hasNext()) {
693 Transaction *transaction = trit->next();
694 transaction->resetServerFailure();
695 // Set the transaction sequence number back to nothing
696 if (!transaction->didSendAPartToServer()) {
697 transaction->setSequenceNumber(-1);
703 // insert into the local block chain
704 validateAndUpdate(newSlots, true);
707 } catch (ServerException *e) {
714 // While we have stuff that needs inserting into the block chain
715 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
719 if (hadPartialSendToServer) {
720 throw new Error("Should Be error free");
725 // If there is a new key with same name then end
726 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
731 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
732 localSequenceNumber++;
734 // Try to fill the slot with data
735 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
736 bool needsResize = fillSlotsReturn.getFirst();
737 int newSize = fillSlotsReturn.getSecond();
738 bool insertedNewKey = fillSlotsReturn.getThird();
741 // Reset which transaction to send
742 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
743 while (trit->hasNext()) {
744 Transaction *transaction = trit->next();
745 transaction->resetNextPartToSend();
747 // Set the transaction sequence number back to nothing
748 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
749 transaction->setSequenceNumber(-1);
754 // Clear the sent data since we are trying again
755 pendingSendArbitrationEntriesToDelete->clear();
756 transactionPartsSent->clear();
758 // We needed a resize so try again
759 fillSlot(slot, true, newKey);
762 lastSlotAttemptedToSend = slot;
763 lastIsNewKey = (newKey != NULL);
764 lastInsertedNewKey = insertedNewKey;
765 lastNewSize = newSize;
767 lastTransactionPartsSent = transactionPartsSent->clone();
768 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
770 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
772 if (sendSlotsReturn.getFirst()) {
774 // Did insert into the block chain
776 if (insertedNewKey) {
777 // This slot was what was inserted not a previous slot
779 // New Key was successfully inserted into the block chain so dont want to insert it again
783 // Remove the aborts and commit parts that were sent from the pending to send queue
784 uint size = pendingSendArbitrationRounds->size();
786 for (uint i = 0; i < size; i++) {
787 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
788 round->removeParts(pendingSendArbitrationEntriesToDelete);
790 if (!round->isDoneSending()) {
791 // Sent all the parts
792 pendingSendArbitrationRounds->set(oldcount++,
793 pendingSendArbitrationRounds->get(i));
796 pendingSendArbitrationRounds->setSize(oldcount);
798 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
799 while (trit->hasNext()) {
800 Transaction *transaction = trit->next();
801 transaction->resetServerFailure();
803 // Update which transactions parts still need to be sent
804 transaction->removeSentParts(transactionPartsSent->get(transaction));
806 // Add the transaction status to the outstanding list
807 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
809 // Update the transaction status
810 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
812 // Check if all the transaction parts were successfully sent and if so then remove it from pending
813 if (transaction->didSendAllParts()) {
814 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
815 pendingTransactionQueue->remove(transaction);
820 // Reset which transaction to send
821 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
822 while (trit->hasNext()) {
823 Transaction *transaction = trit->next();
824 transaction->resetNextPartToSend();
826 // Set the transaction sequence number back to nothing
827 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
828 transaction->setSequenceNumber(-1);
834 // Clear the sent data in preparation for next send
835 pendingSendArbitrationEntriesToDelete->clear();
836 transactionPartsSent->clear();
838 if (sendSlotsReturn.getThird()->length() != 0) {
839 // insert into the local block chain
840 validateAndUpdate(sendSlotsReturn.getThird(), true);
844 } catch (ServerException *e) {
845 if (e->getType() != ServerException_TypeInputTimeout) {
846 // Nothing was able to be sent to the server so just clear these data structures
847 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
848 while (trit->hasNext()) {
849 Transaction *transaction = trit->next();
850 transaction->resetNextPartToSend();
852 // Set the transaction sequence number back to nothing
853 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
854 transaction->setSequenceNumber(-1);
859 // There was a partial send to the server
860 hadPartialSendToServer = true;
862 // Nothing was able to be sent to the server so just clear these data structures
863 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
864 while (trit->hasNext()) {
865 Transaction *transaction = trit->next();
866 transaction->resetNextPartToSend();
867 transaction->setServerFailure();
872 pendingSendArbitrationEntriesToDelete->clear();
873 transactionPartsSent->clear();
878 return newKey == NULL;
881 bool Table::updateFromLocal(int64_t machineId) {
882 if (!localCommunicationTable->contains(machineId))
885 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
887 // Get the size of the send data
888 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
890 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
891 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
892 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
895 Array<char> *sendData = new Array<char>(sendDataSize);
896 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
899 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
903 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
904 localSequenceNumber++;
906 if (returnData == NULL) {
907 // Could not contact server
912 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
913 int numberOfEntries = bbDecode->getInt();
915 for (int i = 0; i < numberOfEntries; i++) {
916 char type = bbDecode->get();
917 if (type == TypeAbort) {
918 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
920 } else if (type == TypeCommitPart) {
921 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
922 processEntry(commitPart);
926 updateLiveStateFromLocal();
931 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
933 // Get the devices local communications
934 if (!localCommunicationTable->contains(transaction->getArbitrator()))
935 return Pair<bool, bool>(true, false);
937 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
939 // Get the size of the send data
940 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
942 Vector<TransactionPart *> *tParts = transaction->getParts();
943 uint tPartsSize = tParts->size();
944 for (uint i = 0; i < tPartsSize; i++) {
945 TransactionPart *part = tParts->get(i);
946 sendDataSize += part->getSize();
950 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
951 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
952 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
955 // Make the send data size
956 Array<char> *sendData = new Array<char>(sendDataSize);
957 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
960 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
961 bbEncode->putInt(transaction->getParts()->size());
963 Vector<TransactionPart *> *tParts = transaction->getParts();
964 uint tPartsSize = tParts->size();
965 for (uint i = 0; i < tPartsSize; i++) {
966 TransactionPart *part = tParts->get(i);
967 part->encode(bbEncode);
972 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
973 localSequenceNumber++;
975 if (returnData == NULL) {
976 // Could not contact server
977 return Pair<bool, bool>(true, false);
981 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
982 bool didCommit = bbDecode->get() == 1;
983 bool couldArbitrate = bbDecode->get() == 1;
984 int numberOfEntries = bbDecode->getInt();
985 bool foundAbort = false;
987 for (int i = 0; i < numberOfEntries; i++) {
988 char type = bbDecode->get();
989 if (type == TypeAbort) {
990 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
992 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
997 } else if (type == TypeCommitPart) {
998 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
999 processEntry(commitPart);
1003 updateLiveStateFromLocal();
1005 if (couldArbitrate) {
1006 TransactionStatus *status = transaction->getTransactionStatus();
1008 status->setStatus(TransactionStatus_StatusCommitted);
1010 status->setStatus(TransactionStatus_StatusAborted);
1013 TransactionStatus *status = transaction->getTransactionStatus();
1015 status->setStatus(TransactionStatus_StatusAborted);
1017 status->setStatus(TransactionStatus_StatusCommitted);
1021 return Pair<bool, bool>(false, true);
1024 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1026 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1027 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1028 int numberOfParts = bbDecode->getInt();
1030 // If we did commit a transaction or not
1031 bool didCommit = false;
1032 bool couldArbitrate = false;
1034 if (numberOfParts != 0) {
1036 // decode the transaction
1037 Transaction *transaction = new Transaction();
1038 for (int i = 0; i < numberOfParts; i++) {
1040 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1041 transaction->addPartDecode(newPart);
1044 // Arbitrate on transaction and pull relevant return data
1045 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1046 couldArbitrate = localArbitrateReturn.getFirst();
1047 didCommit = localArbitrateReturn.getSecond();
1049 updateLiveStateFromLocal();
1051 // Transaction was sent to the server so keep track of it to prevent double commit
1052 if (transaction->getSequenceNumber() != -1) {
1053 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1057 // The data to send back
1058 int returnDataSize = 0;
1059 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1061 // Get the aborts to send back
1062 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1064 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1065 while (abortit->hasNext())
1066 abortLocalSequenceNumbers->add(abortit->next());
1070 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1072 uint asize = abortLocalSequenceNumbers->size();
1073 for (uint i = 0; i < asize; i++) {
1074 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1075 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1079 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1080 unseenArbitrations->add(abort);
1081 returnDataSize += abort->getSize();
1084 // Get the commits to send back
1085 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1086 if (commitForClientTable != NULL) {
1087 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1089 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1090 while (commitit->hasNext())
1091 commitLocalSequenceNumbers->add(commitit->next());
1094 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1096 uint clsSize = commitLocalSequenceNumbers->size();
1097 for (uint clsi = 0; clsi < clsSize; clsi++) {
1098 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1099 Commit *commit = commitForClientTable->get(localSequenceNumber);
1101 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1106 Vector<CommitPart *> *parts = commit->getParts();
1107 uint nParts = parts->size();
1108 for (uint i = 0; i < nParts; i++) {
1109 CommitPart *commitPart = parts->get(i);
1110 unseenArbitrations->add(commitPart);
1111 returnDataSize += commitPart->getSize();
1117 // Number of arbitration entries to decode
1118 returnDataSize += 2 * sizeof(int32_t);
1120 // bool of did commit or not
1121 if (numberOfParts != 0) {
1122 returnDataSize += sizeof(char);
1125 // Data to send Back
1126 Array<char> *returnData = new Array<char>(returnDataSize);
1127 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1129 if (numberOfParts != 0) {
1131 bbEncode->put((char)1);
1133 bbEncode->put((char)0);
1135 if (couldArbitrate) {
1136 bbEncode->put((char)1);
1138 bbEncode->put((char)0);
1142 bbEncode->putInt(unseenArbitrations->size());
1143 uint size = unseenArbitrations->size();
1144 for (uint i = 0; i < size; i++) {
1145 Entry *entry = unseenArbitrations->get(i);
1146 entry->encode(bbEncode);
1149 localSequenceNumber++;
1153 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1154 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1155 attemptedToSendToServer = true;
1157 bool inserted = false;
1158 bool lastTryInserted = false;
1160 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1161 if (array == NULL) {
1162 array = new Array<Slot *>(1);
1163 array->set(0, slot);
1164 rejectedSlotVector->clear();
1167 if (array->length() == 0) {
1168 throw new Error("Server Error: Did not send any slots");
1171 // if (attemptedToSendToServerTmp) {
1172 if (hadPartialSendToServer) {
1174 bool isInserted = false;
1175 uint size = array->length();
1176 for (uint i = 0; i < size; i++) {
1177 Slot *s = array->get(i);
1178 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1184 for (uint i = 0; i < size; i++) {
1185 Slot *s = array->get(i);
1190 // Process each entry in the slot
1191 Vector<Entry *> *entries = s->getEntries();
1192 uint eSize = entries->size();
1193 for (uint ei = 0; ei < eSize; ei++) {
1194 Entry *entry = entries->get(ei);
1196 if (entry->getType() == TypeLastMessage) {
1197 LastMessage *lastMessage = (LastMessage *)entry;
1199 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1208 rejectedSlotVector->add(slot->getSequenceNumber());
1209 lastTryInserted = false;
1211 lastTryInserted = true;
1214 rejectedSlotVector->add(slot->getSequenceNumber());
1215 lastTryInserted = false;
1219 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1223 * Returns false if a resize was needed
1225 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1227 if (liveSlotCount > bufferResizeThreshold) {
1228 resize = true;//Resize is forced
1232 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1233 TableStatus *status = new TableStatus(slot, newSize);
1234 slot->addEntry(status);
1237 // Fill with rejected slots first before doing anything else
1238 doRejectedMessages(slot);
1240 // Do mandatory rescue of entries
1241 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1243 // Extract working variables
1244 bool needsResize = mandatoryRescueReturn.getFirst();
1245 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1246 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1248 if (needsResize && !resize) {
1249 // We need to resize but we are not resizing so return false
1250 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1253 bool inserted = false;
1254 if (newKeyEntry != NULL) {
1255 newKeyEntry->setSlot(slot);
1256 if (slot->hasSpace(newKeyEntry)) {
1257 slot->addEntry(newKeyEntry);
1262 // Clear the transactions, aborts and commits that were sent previously
1263 transactionPartsSent->clear();
1264 pendingSendArbitrationEntriesToDelete->clear();
1265 uint size = pendingSendArbitrationRounds->size();
1266 for (uint i = 0; i < size; i++) {
1267 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1268 bool isFull = false;
1269 round->generateParts();
1270 Vector<Entry *> *parts = round->getParts();
1272 // Insert pending arbitration data
1273 uint vsize = parts->size();
1274 for (uint vi = 0; vi < vsize; vi++) {
1275 Entry *arbitrationData = parts->get(vi);
1277 // If it is an abort then we need to set some information
1278 if (arbitrationData->getType() == TypeAbort) {
1279 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1282 if (!slot->hasSpace(arbitrationData)) {
1283 // No space so cant do anything else with these data entries
1288 // Add to this current slot and add it to entries to delete
1289 slot->addEntry(arbitrationData);
1290 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1298 if (pendingTransactionQueue->size() > 0) {
1299 Transaction *transaction = pendingTransactionQueue->get(0);
1300 // Set the transaction sequence number if it has yet to be inserted into the block chain
1301 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1302 transaction->setSequenceNumber(slot->getSequenceNumber());
1306 TransactionPart *part = transaction->getNextPartToSend();
1308 // Ran out of parts to send for this transaction so move on
1312 if (slot->hasSpace(part)) {
1313 slot->addEntry(part);
1314 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1315 if (partsSent == NULL) {
1316 partsSent = new Vector<int32_t>();
1317 transactionPartsSent->put(transaction, partsSent);
1319 partsSent->add(part->getPartNumber());
1320 transactionPartsSent->put(transaction, partsSent);
1327 // Fill the remainder of the slot with rescue data
1328 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1330 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1333 void Table::doRejectedMessages(Slot *s) {
1334 if (!rejectedSlotVector->isEmpty()) {
1335 /* TODO: We should avoid generating a rejected message entry if
1336 * there is already a sufficient entry in the queue (e->g->,
1337 * equalsto value of true and same sequence number)-> */
1339 int64_t old_seqn = rejectedSlotVector->get(0);
1340 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1341 int64_t new_seqn = rejectedSlotVector->lastElement();
1342 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1345 int64_t prev_seqn = -1;
1347 /* Go through list of missing messages */
1348 for (; i < rejectedSlotVector->size(); i++) {
1349 int64_t curr_seqn = rejectedSlotVector->get(i);
1350 Slot *s_msg = buffer->getSlot(curr_seqn);
1353 prev_seqn = curr_seqn;
1355 /* Generate rejected message entry for missing messages */
1356 if (prev_seqn != -1) {
1357 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1360 /* Generate rejected message entries for present messages */
1361 for (; i < rejectedSlotVector->size(); i++) {
1362 int64_t curr_seqn = rejectedSlotVector->get(i);
1363 Slot *s_msg = buffer->getSlot(curr_seqn);
1364 int64_t machineid = s_msg->getMachineID();
1365 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1372 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1373 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1374 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1375 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1376 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1379 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1380 bool seenLiveSlot = false;
1381 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1382 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1386 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1387 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1388 // Push slot number forward
1389 if (!seenLiveSlot) {
1390 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1393 if (!previousSlot->isLive()) {
1397 // We have seen a live slot
1398 seenLiveSlot = true;
1400 // Get all the live entries for a slot
1401 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1403 // Iterate over all the live entries and try to rescue them
1404 uint lESize = liveEntries->size();
1405 for (uint i = 0; i < lESize; i++) {
1406 Entry *liveEntry = liveEntries->get(i);
1407 if (slot->hasSpace(liveEntry)) {
1408 // Enough space to rescue the entry
1409 slot->addEntry(liveEntry);
1410 } else if (currentSequenceNumber == firstIfFull) {
1411 //if there's no space but the entry is about to fall off the queue
1412 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1418 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1421 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1422 /* now go through live entries from least to greatest sequence number until
1423 * either all live slots added, or the slot doesn't have enough room
1424 * for SKIP_THRESHOLD consecutive entries*/
1426 int64_t newestseqnum = buffer->getNewestSeqNum();
1427 for (; seqn <= newestseqnum; seqn++) {
1428 Slot *prevslot = buffer->getSlot(seqn);
1429 //Push slot number forward
1431 oldestLiveSlotSequenceNumver = seqn;
1433 if (!prevslot->isLive())
1435 seenliveslot = true;
1436 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1437 uint lESize = liveentries->size();
1438 for (uint i = 0; i < lESize; i++) {
1439 Entry *liveentry = liveentries->get(i);
1440 if (s->hasSpace(liveentry))
1441 s->addEntry(liveentry);
1444 if (skipcount > Table_SKIP_THRESHOLD)
1454 * Checks for malicious activity and updates the local copy of the block chain->
1456 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1457 // The cloud communication layer has checked slot HMACs already
1459 if (newSlots->length() == 0) {
1463 // Make sure all slots are newer than the last largest slot this
1465 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1466 if (firstSeqNum <= sequenceNumber) {
1467 throw new Error("Server Error: Sent older slots!");
1470 // Create an object that can access both new slots and slots in our
1471 // local chain without committing slots to our local chain
1472 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1474 // Check that the HMAC chain is not broken
1475 checkHMACChain(indexer, newSlots);
1477 // Set to keep track of messages from clients
1478 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1480 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1481 while (lmit->hasNext())
1482 machineSet->add(lmit->next());
1486 // Process each slots data
1488 uint numSlots = newSlots->length();
1489 for (uint i = 0; i < numSlots; i++) {
1490 Slot *slot = newSlots->get(i);
1491 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1492 updateExpectedSize();
1496 // If there is a gap, check to see if the server sent us
1498 if (firstSeqNum != (sequenceNumber + 1)) {
1500 // Check the size of the slots that were sent down by the server->
1501 // Can only check the size if there was a gap
1502 checkNumSlots(newSlots->length());
1504 // Since there was a gap every machine must have pushed a slot or
1505 // must have a last message message-> If not then the server is
1507 if (!machineSet->isEmpty()) {
1508 throw new Error("Missing record for machines: ");
1512 // Update the size of our local block chain->
1515 // Commit new to slots to the local block chain->
1517 uint numSlots = newSlots->length();
1518 for (uint i = 0; i < numSlots; i++) {
1519 Slot *slot = newSlots->get(i);
1521 // Insert this slot into our local block chain copy->
1522 buffer->putSlot(slot);
1524 // Keep track of how many slots are currently live (have live data
1529 // Get the sequence number of the latest slot in the system
1530 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1531 updateLiveStateFromServer();
1533 // No Need to remember after we pulled from the server
1534 offlineTransactionsCommittedAndAtServer->clear();
1536 // This is invalidated now
1537 hadPartialSendToServer = false;
1540 void Table::updateLiveStateFromServer() {
1541 // Process the new transaction parts
1542 processNewTransactionParts();
1544 // Do arbitration on new transactions that were received
1545 arbitrateFromServer();
1547 // Update all the committed keys
1548 bool didCommitOrSpeculate = updateCommittedTable();
1550 // Delete the transactions that are now dead
1551 updateLiveTransactionsAndStatus();
1554 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1555 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1558 void Table::updateLiveStateFromLocal() {
1559 // Update all the committed keys
1560 bool didCommitOrSpeculate = updateCommittedTable();
1562 // Delete the transactions that are now dead
1563 updateLiveTransactionsAndStatus();
1566 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1567 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1570 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1571 int64_t prevslots = firstSequenceNumber;
1573 if (didFindTableStatus) {
1575 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1578 didFindTableStatus = true;
1579 currMaxSize = numberOfSlots;
1582 void Table::updateExpectedSize() {
1585 if (expectedsize > currMaxSize) {
1586 expectedsize = currMaxSize;
1592 * Check the size of the block chain to make sure there are enough
1593 * slots sent back by the server-> This is only called when we have a
1594 * gap between the slots that we have locally and the slots sent by
1595 * the server therefore in the slots sent by the server there will be
1596 * at least 1 Table status message
1598 void Table::checkNumSlots(int numberOfSlots) {
1599 if (numberOfSlots != expectedsize) {
1600 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1605 * Update the size of of the local buffer if it is needed->
1607 void Table::commitNewMaxSize() {
1608 didFindTableStatus = false;
1610 // Resize the local slot buffer
1611 if (numberOfSlots != currMaxSize) {
1612 buffer->resize((int32_t)currMaxSize);
1615 // Change the number of local slots to the new size
1616 numberOfSlots = (int32_t)currMaxSize;
1618 // Recalculate the resize threshold since the size of the local
1619 // buffer has changed
1620 setResizeThreshold();
1624 * Process the new transaction parts from this latest round of slots
1625 * received from the server
1627 void Table::processNewTransactionParts() {
1629 if (newTransactionParts->size() == 0) {
1630 // Nothing new to process
1634 // Iterate through all the machine Ids that we received new parts
1636 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1637 while (tpit->hasNext()) {
1638 int64_t machineId = tpit->next();
1639 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1641 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1642 // Iterate through all the parts for that machine Id
1643 while (ptit->hasNext()) {
1644 Pair<int64_t, int32_t> *partId = ptit->next();
1645 TransactionPart *part = parts->get(partId);
1647 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1648 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1649 if (lastTransactionNumber >= part->getSequenceNumber()) {
1650 // Set dead the transaction part
1656 // Get the transaction object for that sequence number
1657 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1659 if (transaction == NULL) {
1660 // This is a new transaction that we dont have so make a new one
1661 transaction = new Transaction();
1663 // Insert this new transaction into the live tables
1664 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1665 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1668 // Add that part to the transaction
1669 transaction->addPartDecode(part);
1674 // Clear all the new transaction parts in preparation for the next
1675 // time the server sends slots
1676 newTransactionParts->clear();
1679 void Table::arbitrateFromServer() {
1681 if (liveTransactionBySequenceNumberTable->size() == 0) {
1682 // Nothing to arbitrate on so move on
1686 // Get the transaction sequence numbers and sort from oldest to newest
1687 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1689 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1690 while (trit->hasNext())
1691 transactionSequenceNumbers->add(trit->next());
1694 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1696 // Collection of key value pairs that are
1697 Hashtable<IoTString *, KeyValue *> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1699 // The last transaction arbitrated on
1700 int64_t lastTransactionCommitted = -1;
1701 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1702 uint tsnSize = transactionSequenceNumbers->size();
1703 for (uint i = 0; i < tsnSize; i++) {
1704 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1705 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1707 // Check if this machine arbitrates for this transaction if not
1708 // then we cant arbitrate this transaction
1709 if (transaction->getArbitrator() != localMachineId) {
1713 if (transactionSequenceNumber < lastSeqNumArbOn) {
1717 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1718 // We have seen this already locally so dont commit again
1723 if (!transaction->isComplete()) {
1724 // Will arbitrate in incorrect order if we continue so just break
1730 // update the largest transaction seen by arbitrator from server
1731 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1732 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1734 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1735 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1736 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1740 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1741 // Guard evaluated as true
1743 // Update the local changes so we can make the commit
1744 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1745 while (kvit->hasNext()) {
1746 KeyValue *kv = kvit->next();
1747 speculativeTableTmp->put(kv->getKey(), kv);
1751 // Update what the last transaction committed was for use in batch commit
1752 lastTransactionCommitted = transactionSequenceNumber;
1754 // Guard evaluated was false so create abort
1756 Abort *newAbort = new Abort(NULL,
1757 transaction->getClientLocalSequenceNumber(),
1758 transaction->getSequenceNumber(),
1759 transaction->getMachineId(),
1760 transaction->getArbitrator(),
1761 localArbitrationSequenceNumber);
1762 localArbitrationSequenceNumber++;
1763 generatedAborts->add(newAbort);
1765 // Insert the abort so we can process
1766 processEntry(newAbort);
1769 lastSeqNumArbOn = transactionSequenceNumber;
1772 Commit *newCommit = NULL;
1774 // If there is something to commit
1775 if (speculativeTableTmp->size() != 0) {
1776 // Create the commit and increment the commit sequence number
1777 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1778 localArbitrationSequenceNumber++;
1780 // Add all the new keys to the commit
1781 SetIterator<IoTString *, KeyValue *> *spit = getKeyIterator(speculativeTableTmp);
1782 while (spit->hasNext()) {
1783 IoTString *string = spit->next();
1784 KeyValue *kv = speculativeTableTmp->get(string);
1785 newCommit->addKV(kv);
1789 // create the commit parts
1790 newCommit->createCommitParts();
1792 // Append all the commit parts to the end of the pending queue
1793 // waiting for sending to the server
1794 // Insert the commit so we can process it
1795 Vector<CommitPart *> *parts = newCommit->getParts();
1796 uint partsSize = parts->size();
1797 for (uint i = 0; i < partsSize; i++) {
1798 CommitPart *commitPart = parts->get(i);
1799 processEntry(commitPart);
1803 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1804 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1805 pendingSendArbitrationRounds->add(arbitrationRound);
1807 if (compactArbitrationData()) {
1808 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1809 if (newArbitrationRound->getCommit() != NULL) {
1810 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1811 uint partsSize = parts->size();
1812 for (uint i = 0; i < partsSize; i++) {
1813 CommitPart *commitPart = parts->get(i);
1814 processEntry(commitPart);
1821 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1823 // Check if this machine arbitrates for this transaction if not then
1824 // we cant arbitrate this transaction
1825 if (transaction->getArbitrator() != localMachineId) {
1826 return Pair<bool, bool>(false, false);
1829 if (!transaction->isComplete()) {
1830 // Will arbitrate in incorrect order if we continue so just break
1832 return Pair<bool, bool>(false, false);
1835 if (transaction->getMachineId() != localMachineId) {
1836 // dont do this check for local transactions
1837 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1838 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1839 // We've have already seen this from the server
1840 return Pair<bool, bool>(false, false);
1845 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1846 // Guard evaluated as true Create the commit and increment the
1847 // commit sequence number
1848 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1849 localArbitrationSequenceNumber++;
1851 // Update the local changes so we can make the commit
1852 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1853 while (kvit->hasNext()) {
1854 KeyValue *kv = kvit->next();
1855 newCommit->addKV(kv);
1859 // create the commit parts
1860 newCommit->createCommitParts();
1862 // Append all the commit parts to the end of the pending queue
1863 // waiting for sending to the server
1864 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1865 pendingSendArbitrationRounds->add(arbitrationRound);
1867 if (compactArbitrationData()) {
1868 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1869 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1870 uint partsSize = parts->size();
1871 for (uint i = 0; i < partsSize; i++) {
1872 CommitPart *commitPart = parts->get(i);
1873 processEntry(commitPart);
1876 // Insert the commit so we can process it
1877 Vector<CommitPart *> *parts = newCommit->getParts();
1878 uint partsSize = parts->size();
1879 for (uint i = 0; i < partsSize; i++) {
1880 CommitPart *commitPart = parts->get(i);
1881 processEntry(commitPart);
1885 if (transaction->getMachineId() == localMachineId) {
1886 TransactionStatus *status = transaction->getTransactionStatus();
1887 if (status != NULL) {
1888 status->setStatus(TransactionStatus_StatusCommitted);
1892 updateLiveStateFromLocal();
1893 return Pair<bool, bool>(true, true);
1895 if (transaction->getMachineId() == localMachineId) {
1896 // For locally created messages update the status
1897 // Guard evaluated was false so create abort
1898 TransactionStatus *status = transaction->getTransactionStatus();
1899 if (status != NULL) {
1900 status->setStatus(TransactionStatus_StatusAborted);
1903 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1906 Abort *newAbort = new Abort(NULL,
1907 transaction->getClientLocalSequenceNumber(),
1909 transaction->getMachineId(),
1910 transaction->getArbitrator(),
1911 localArbitrationSequenceNumber);
1912 localArbitrationSequenceNumber++;
1913 addAbortSet->add(newAbort);
1915 // Append all the commit parts to the end of the pending queue
1916 // waiting for sending to the server
1917 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1918 pendingSendArbitrationRounds->add(arbitrationRound);
1920 if (compactArbitrationData()) {
1921 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1923 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1924 uint partsSize = parts->size();
1925 for (uint i = 0; i < partsSize; i++) {
1926 CommitPart *commitPart = parts->get(i);
1927 processEntry(commitPart);
1932 updateLiveStateFromLocal();
1933 return Pair<bool, bool>(true, false);
1938 * Compacts the arbitration data my merging commits and aggregating
1939 * aborts so that a single large push of commits can be done instead
1940 * of many small updates
1942 bool Table::compactArbitrationData() {
1943 if (pendingSendArbitrationRounds->size() < 2) {
1944 // Nothing to compact so do nothing
1948 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1949 if (lastRound->getDidSendPart()) {
1953 bool hadCommit = (lastRound->getCommit() == NULL);
1954 bool gotNewCommit = false;
1956 uint numberToDelete = 1;
1957 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1958 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1960 if (round->isFull() || round->getDidSendPart()) {
1961 // Stop since there is a part that cannot be compacted and we
1962 // need to compact in order
1966 if (round->getCommit() == NULL) {
1967 // Try compacting aborts only
1968 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1969 if (newSize > ArbitrationRound_MAX_PARTS) {
1970 // Cant compact since it would be too large
1973 lastRound->addAborts(round->getAborts());
1975 // Create a new larger commit
1976 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1977 localArbitrationSequenceNumber++;
1979 // Create the commit parts so that we can count them
1980 newCommit->createCommitParts();
1982 // Calculate the new size of the parts
1983 int newSize = newCommit->getNumberOfParts();
1984 newSize += lastRound->getAbortsCount();
1985 newSize += round->getAbortsCount();
1987 if (newSize > ArbitrationRound_MAX_PARTS) {
1988 // Cant compact since it would be too large
1992 // Set the new compacted part
1993 lastRound->setCommit(newCommit);
1994 lastRound->addAborts(round->getAborts());
1995 gotNewCommit = true;
2001 if (numberToDelete != 1) {
2002 // If there is a compaction
2003 // Delete the previous pieces that are now in the new compacted piece
2004 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2005 pendingSendArbitrationRounds->clear();
2007 for (uint i = 0; i < numberToDelete; i++) {
2008 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2012 // Add the new compacted into the pending to send list
2013 pendingSendArbitrationRounds->add(lastRound);
2015 // Should reinsert into the commit processor
2016 if (hadCommit && gotNewCommit) {
2025 * Update all the commits and the committed tables, sets dead the dead
2028 bool Table::updateCommittedTable() {
2030 if (newCommitParts->size() == 0) {
2031 // Nothing new to process
2035 // Iterate through all the machine Ids that we received new parts for
2036 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2037 while (partsit->hasNext()) {
2038 int64_t machineId = partsit->next();
2039 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2041 // Iterate through all the parts for that machine Id
2042 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2043 while (pairit->hasNext()) {
2044 Pair<int64_t, int32_t> *partId = pairit->next();
2045 CommitPart *part = parts->get(partId);
2047 // Get the transaction object for that sequence number
2048 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2050 if (commitForClientTable == NULL) {
2051 // This is the first commit from this device
2052 commitForClientTable = new Hashtable<int64_t, Commit *>();
2053 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2056 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2058 if (commit == NULL) {
2059 // This is a new commit that we dont have so make a new one
2060 commit = new Commit();
2062 // Insert this new commit into the live tables
2063 commitForClientTable->put(part->getSequenceNumber(), commit);
2066 // Add that part to the commit
2067 commit->addPartDecode(part);
2073 // Clear all the new commits parts in preparation for the next time
2074 // the server sends slots
2075 newCommitParts->clear();
2077 // If we process a new commit keep track of it for future use
2078 bool didProcessANewCommit = false;
2080 // Process the commits one by one
2081 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2082 while (liveit->hasNext()) {
2083 int64_t arbitratorId = liveit->next();
2085 // Get all the commits for a specific arbitrator
2086 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2088 // Sort the commits in order
2089 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2091 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2092 while (clientit->hasNext())
2093 commitSequenceNumbers->add(clientit->next());
2097 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2099 // Get the last commit seen from this arbitrator
2100 int64_t lastCommitSeenSequenceNumber = -1;
2101 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2102 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2105 // Go through each new commit one by one
2106 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2107 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2108 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2110 // Special processing if a commit is not complete
2111 if (!commit->isComplete()) {
2112 if (i == (commitSequenceNumbers->size() - 1)) {
2113 // If there is an incomplete commit and this commit is the
2114 // latest one seen then this commit cannot be processed and
2115 // there are no other commits
2118 // This is a commit that was already dead but parts of it
2119 // are still in the block chain (not flushed out yet)->
2120 // Delete it and move on
2122 commitForClientTable->remove(commit->getSequenceNumber());
2127 // Update the last transaction that was updated if we can
2128 if (commit->getTransactionSequenceNumber() != -1) {
2129 // Update the last transaction sequence number that the arbitrator arbitrated on1
2130 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2131 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2135 // Update the last arbitration data that we have seen so far
2136 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2137 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2138 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2140 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2143 // Never seen any data from this arbitrator so record the first one
2144 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2147 // We have already seen this commit before so need to do the
2148 // full processing on this commit
2149 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2151 // Update the last transaction that was updated if we can
2152 if (commit->getTransactionSequenceNumber() != -1) {
2153 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2154 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2155 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2156 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2163 // If we got here then this is a brand new commit and needs full
2165 // Get what commits should be edited, these are the commits that
2166 // have live values for their keys
2167 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2169 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2170 while (kvit->hasNext()) {
2171 KeyValue *kv = kvit->next();
2172 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2174 commitsToEdit->add(commit);
2179 // Update each previous commit that needs to be updated
2180 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2181 while (commitit->hasNext()) {
2182 Commit *previousCommit = commitit->next();
2184 // Only bother with live commits (TODO: Maybe remove this check)
2185 if (previousCommit->isLive()) {
2187 // Update which keys in the old commits are still live
2189 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2190 while (kvit->hasNext()) {
2191 KeyValue *kv = kvit->next();
2192 previousCommit->invalidateKey(kv->getKey());
2197 // if the commit is now dead then remove it
2198 if (!previousCommit->isLive()) {
2199 commitForClientTable->remove(previousCommit->getSequenceNumber());
2205 // Update the last seen sequence number from this arbitrator
2206 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2207 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2208 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2211 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2214 // We processed a new commit that we havent seen before
2215 didProcessANewCommit = true;
2217 // Update the committed table of keys and which commit is using which key
2219 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2220 while (kvit->hasNext()) {
2221 KeyValue *kv = kvit->next();
2222 committedKeyValueTable->put(kv->getKey(), kv);
2223 liveCommitsByKeyTable->put(kv->getKey(), commit);
2231 return didProcessANewCommit;
2235 * Create the speculative table from transactions that are still live
2236 * and have come from the cloud
2238 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2239 if (liveTransactionBySequenceNumberTable->size() == 0) {
2240 // There is nothing to speculate on
2244 // Create a list of the transaction sequence numbers and sort them
2245 // from oldest to newest
2246 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2248 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2249 while (trit->hasNext())
2250 transactionSequenceNumbersSorted->add(trit->next());
2254 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2256 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2259 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2260 // If there is a gap in the transaction sequence numbers then
2261 // there was a commit or an abort of a transaction OR there was a
2262 // new commit (Could be from offline commit) so a redo the
2263 // speculation from scratch
2265 // Start from scratch
2266 speculatedKeyValueTable->clear();
2267 lastTransactionSequenceNumberSpeculatedOn = -1;
2268 oldestTransactionSequenceNumberSpeculatedOn = -1;
2271 // Remember the front of the transaction list
2272 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2274 // Find where to start arbitration from
2275 uint startIndex = 0;
2277 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2278 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2282 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2283 // Make sure we are not out of bounds
2284 return false; // did not speculate
2287 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2288 bool didSkip = true;
2290 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2291 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2292 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2294 if (!transaction->isComplete()) {
2295 // If there is an incomplete transaction then there is nothing
2296 // we can do add this transactions arbitrator to the list of
2297 // arbitrators we should ignore
2298 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2303 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2307 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2309 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2310 // Guard evaluated to true so update the speculative table
2312 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2313 while (kvit->hasNext()) {
2314 KeyValue *kv = kvit->next();
2315 speculatedKeyValueTable->put(kv->getKey(), kv);
2323 // Since there was a skip we need to redo the speculation next time around
2324 lastTransactionSequenceNumberSpeculatedOn = -1;
2325 oldestTransactionSequenceNumberSpeculatedOn = -1;
2328 // We did some speculation
2333 * Create the pending transaction speculative table from transactions
2334 * that are still in the pending transaction buffer
2336 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2337 if (pendingTransactionQueue->size() == 0) {
2338 // There is nothing to speculate on
2342 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2343 // need to reset on the pending speculation
2344 lastPendingTransactionSpeculatedOn = NULL;
2345 firstPendingTransaction = pendingTransactionQueue->get(0);
2346 pendingTransactionSpeculatedKeyValueTable->clear();
2349 // Find where to start arbitration from
2350 uint startIndex = 0;
2352 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2353 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2356 if (startIndex >= pendingTransactionQueue->size()) {
2357 // Make sure we are not out of bounds
2361 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2362 Transaction *transaction = pendingTransactionQueue->get(i);
2364 lastPendingTransactionSpeculatedOn = transaction;
2366 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2367 // Guard evaluated to true so update the speculative table
2368 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2369 while (kvit->hasNext()) {
2370 KeyValue *kv = kvit->next();
2371 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2379 * Set dead and remove from the live transaction tables the
2380 * transactions that are dead
2382 void Table::updateLiveTransactionsAndStatus() {
2383 // Go through each of the transactions
2385 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2386 while (iter->hasNext()) {
2387 int64_t key = iter->next();
2388 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2390 // Check if the transaction is dead
2391 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2392 // Set dead the transaction
2393 transaction->setDead();
2395 // Remove the transaction from the live table
2397 liveTransactionByTransactionIdTable->remove(transaction->getId());
2403 // Go through each of the transactions
2405 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2406 while (iter->hasNext()) {
2407 int64_t key = iter->next();
2408 TransactionStatus *status = outstandingTransactionStatus->get(key);
2410 // Check if the transaction is dead
2411 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2414 status->setStatus(TransactionStatus_StatusCommitted);
2425 * Process this slot, entry by entry-> Also update the latest message sent by slot
2427 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2429 // Update the last message seen
2430 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2432 // Process each entry in the slot
2433 Vector<Entry *> *entries = slot->getEntries();
2434 uint eSize = entries->size();
2435 for (uint ei = 0; ei < eSize; ei++) {
2436 Entry *entry = entries->get(ei);
2437 switch (entry->getType()) {
2438 case TypeCommitPart:
2439 processEntry((CommitPart *)entry);
2442 processEntry((Abort *)entry);
2444 case TypeTransactionPart:
2445 processEntry((TransactionPart *)entry);
2448 processEntry((NewKey *)entry);
2450 case TypeLastMessage:
2451 processEntry((LastMessage *)entry, machineSet);
2453 case TypeRejectedMessage:
2454 processEntry((RejectedMessage *)entry, indexer);
2456 case TypeTableStatus:
2457 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2460 throw new Error("Unrecognized type: ");
2466 * Update the last message that was sent for a machine Id
2468 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2469 // Update what the last message received by a machine was
2470 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2474 * Add the new key to the arbitrators table and update the set of live
2475 * new keys (in case of a rescued new key message)
2477 void Table::processEntry(NewKey *entry) {
2478 // Update the arbitrator table with the new key information
2479 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2481 // Update what the latest live new key is
2482 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2483 if (oldNewKey != NULL) {
2484 // Delete the old new key messages
2485 oldNewKey->setDead();
2490 * Process new table status entries and set dead the old ones as new
2491 * ones come in-> keeps track of the largest and smallest table status
2492 * seen in this current round of updating the local copy of the block
2495 void Table::processEntry(TableStatus *entry, int64_t seq) {
2496 int newNumSlots = entry->getMaxSlots();
2497 updateCurrMaxSize(newNumSlots);
2498 initExpectedSize(seq, newNumSlots);
2500 if (liveTableStatus != NULL) {
2501 // We have a larger table status so the old table status is no
2503 liveTableStatus->setDead();
2506 // Make this new table status the latest alive table status
2507 liveTableStatus = entry;
2511 * Check old messages to see if there is a block chain violation->
2514 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2515 int64_t oldSeqNum = entry->getOldSeqNum();
2516 int64_t newSeqNum = entry->getNewSeqNum();
2517 bool isequal = entry->getEqual();
2518 int64_t machineId = entry->getMachineID();
2519 int64_t seq = entry->getSequenceNumber();
2521 // Check if we have messages that were supposed to be rejected in
2522 // our local block chain
2523 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2525 Slot *slot = indexer->getSlot(seqNum);
2528 // If we have this slot make sure that it was not supposed to be
2530 int64_t slotMachineId = slot->getMachineID();
2531 if (isequal != (slotMachineId == machineId)) {
2532 throw new Error("Server Error: Trying to insert rejected message for slot ");
2537 // Create a list of clients to watch until they see this rejected
2539 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2540 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2541 while (iter->hasNext()) {
2542 // Machine ID for the last message entry
2543 int64_t lastMessageEntryMachineId = iter->next();
2545 // We've seen it, don't need to continue to watch-> Our next
2546 // message will implicitly acknowledge it->
2547 if (lastMessageEntryMachineId == localMachineId) {
2551 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2552 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2554 if (entrySequenceNumber < seq) {
2555 // Add this rejected message to the set of messages that this
2556 // machine ID did not see yet
2557 addWatchVector(lastMessageEntryMachineId, entry);
2558 // This client did not see this rejected message yet so add it
2559 // to the watch set to monitor
2560 deviceWatchSet->add(lastMessageEntryMachineId);
2565 if (deviceWatchSet->isEmpty()) {
2566 // This rejected message has been seen by all the clients so
2569 // We need to watch this rejected message
2570 entry->setWatchSet(deviceWatchSet);
2575 * Check if this abort is live, if not then save it so we can kill it
2576 * later-> update the last transaction number that was arbitrated on->
2578 void Table::processEntry(Abort *entry) {
2579 if (entry->getTransactionSequenceNumber() != -1) {
2580 // update the transaction status if it was sent to the server
2581 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2582 if (status != NULL) {
2583 status->setStatus(TransactionStatus_StatusAborted);
2587 // Abort has not been seen by the client it is for yet so we need to
2590 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2591 if (previouslySeenAbort != NULL) {
2592 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2595 if (entry->getTransactionArbitrator() == localMachineId) {
2596 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2599 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2600 // The machine already saw this so it is dead
2602 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2603 liveAbortTable->remove(&abortid);
2605 if (entry->getTransactionArbitrator() == localMachineId) {
2606 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2611 // Update the last arbitration data that we have seen so far
2612 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2613 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2614 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2616 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2619 // Never seen any data from this arbitrator so record the first one
2620 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2623 // Set dead a transaction if we can
2624 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2626 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2627 if (transactionToSetDead != NULL) {
2628 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2631 // Update the last transaction sequence number that the arbitrator
2633 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2634 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2636 if (entry->getTransactionSequenceNumber() != -1) {
2637 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2643 * Set dead the transaction part if that transaction is dead and keep
2644 * track of all new parts
2646 void Table::processEntry(TransactionPart *entry) {
2647 // Check if we have already seen this transaction and set it dead OR
2648 // if it is not alive
2649 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2650 // This transaction is dead, it was already committed or aborted
2655 // This part is still alive
2656 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2658 if (transactionPart == NULL) {
2659 // Dont have a table for this machine Id yet so make one
2660 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2661 newTransactionParts->put(entry->getMachineId(), transactionPart);
2664 // Update the part and set dead ones we have already seen (got a
2666 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2667 if (previouslySeenPart != NULL) {
2668 previouslySeenPart->setDead();
2673 * Process new commit entries and save them for future use-> Delete duplicates
2675 void Table::processEntry(CommitPart *entry) {
2676 // Update the last transaction that was updated if we can
2677 if (entry->getTransactionSequenceNumber() != -1) {
2678 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2679 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2683 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2684 if (commitPart == NULL) {
2685 // Don't have a table for this machine Id yet so make one
2686 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2687 newCommitParts->put(entry->getMachineId(), commitPart);
2689 // Update the part and set dead ones we have already seen (got a
2691 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2692 if (previouslySeenPart != NULL) {
2693 previouslySeenPart->setDead();
2698 * Update the last message seen table-> Update and set dead the
2699 * appropriate RejectedMessages as clients see them-> Updates the live
2700 * aborts, removes those that are dead and sets them dead-> Check that
2701 * the last message seen is correct and that there is no mismatch of
2702 * our own last message or that other clients have not had a rollback
2703 * on the last message->
2705 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2706 // We have seen this machine ID
2707 machineSet->remove(machineId);
2709 // Get the set of rejected messages that this machine Id is has not seen yet
2710 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2711 // If there is a rejected message that this machine Id has not seen yet
2712 if (watchset != NULL) {
2713 // Go through each rejected message that this machine Id has not
2716 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2717 while (rmit->hasNext()) {
2718 RejectedMessage *rm = rmit->next();
2719 // If this machine Id has seen this rejected message->->->
2720 if (rm->getSequenceNumber() <= seqNum) {
2721 // Remove it from our watchlist
2723 // Decrement machines that need to see this notification
2724 rm->removeWatcher(machineId);
2730 // Set dead the abort
2731 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2733 while (abortit->hasNext()) {
2734 Pair<int64_t, int64_t> *key = abortit->next();
2735 Abort *abort = liveAbortTable->get(key);
2736 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2739 if (abort->getTransactionArbitrator() == localMachineId) {
2740 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2745 if (machineId == localMachineId) {
2746 // Our own messages are immediately dead->
2747 char livenessType = liveness->getType();
2748 if (livenessType == TypeLastMessage) {
2749 ((LastMessage *)liveness)->setDead();
2750 } else if (livenessType == TypeSlot) {
2751 ((Slot *)liveness)->setDead();
2753 throw new Error("Unrecognized type");
2756 // Get the old last message for this device
2757 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2758 if (lastMessageEntry == NULL) {
2759 // If no last message then there is nothing else to process
2763 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2764 Liveness *lastEntry = lastMessageEntry->getSecond();
2765 delete lastMessageEntry;
2767 // If it is not our machine Id since we already set ours to dead
2768 if (machineId != localMachineId) {
2769 char lastEntryType = lastEntry->getType();
2771 if (lastEntryType == TypeLastMessage) {
2772 ((LastMessage *)lastEntry)->setDead();
2773 } else if (lastEntryType == TypeSlot) {
2774 ((Slot *)lastEntry)->setDead();
2776 throw new Error("Unrecognized type");
2779 // Make sure the server is not playing any games
2780 if (machineId == localMachineId) {
2781 if (hadPartialSendToServer) {
2782 // We were not making any updates and we had a machine mismatch
2783 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2784 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2787 // We were not making any updates and we had a machine mismatch
2788 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2789 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2793 if (lastMessageSeqNum > seqNum) {
2794 throw new Error("Server Error: Rollback on remote machine sequence number");
2800 * Add a rejected message entry to the watch set to keep track of
2801 * which clients have seen that rejected message entry and which have
2804 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2805 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2806 if (entries == NULL) {
2807 // There is no set for this machine ID yet so create one
2808 entries = new Hashset<RejectedMessage *>();
2809 rejectedMessageWatchVectorTable->put(machineId, entries);
2811 entries->add(entry);
2815 * Check if the HMAC chain is not violated
2817 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2818 for (uint i = 0; i < newSlots->length(); i++) {
2819 Slot *currSlot = newSlots->get(i);
2820 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2821 if (prevSlot != NULL &&
2822 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2823 throw new Error("Server Error: Invalid HMAC Chain");