3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void * a, const void *b) {
25 const int64_t * pa = (const int64_t *) a;
26 const int64_t * pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localTransactionSequenceNumber(0),
50 lastTransactionSequenceNumberSpeculatedOn(0),
51 oldestTransactionSequenceNumberSpeculatedOn(0),
52 localArbitrationSequenceNumber(0),
53 hadPartialSendToServer(false),
54 attemptedToSendToServer(false),
56 didFindTableStatus(false),
58 lastSlotAttemptedToSend(NULL),
61 lastTransactionPartsSent(NULL),
62 lastPendingSendArbitrationEntriesToDelete(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localTransactionSequenceNumber(0),
112 lastTransactionSequenceNumberSpeculatedOn(0),
113 oldestTransactionSequenceNumberSpeculatedOn(0),
114 localArbitrationSequenceNumber(0),
115 hadPartialSendToServer(false),
116 attemptedToSendToServer(false),
118 didFindTableStatus(false),
120 lastSlotAttemptedToSend(NULL),
123 lastTransactionPartsSent(NULL),
124 lastPendingSendArbitrationEntriesToDelete(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
160 * Init all the stuff needed for for table usage
163 // Init helper objects
164 random = new Random();
165 buffer = new SlotBuffer();
168 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174 arbitratorTable = new Hashtable<IoTString *, int64_t>();
175 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184 rejectedSlotVector = new Vector<int64_t>();
185 pendingTransactionQueue = new Vector<Transaction *>();
186 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
197 numberOfSlots = buffer->capacity();
198 setResizeThreshold();
202 * Initialize the table by inserting a table status as the first entry
203 * into the table status also initialize the crypto stuff.
205 void Table::initTable() {
206 cloud->initSecurity();
208 // Create the first insertion into the block chain which is the table status
209 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210 localSequenceNumber++;
211 TableStatus *status = new TableStatus(s, numberOfSlots);
213 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
216 array = new Array<Slot *>(1);
218 // update local block chain
219 validateAndUpdate(array, true);
220 } else if (array->length() == 1) {
221 // in case we did push the slot BUT we failed to init it
222 validateAndUpdate(array, true);
224 throw new Error("Error on initialization");
229 * Rebuild the table from scratch by pulling the latest block chain
232 void Table::rebuild() {
233 // Just pull the latest slots from the server
234 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235 validateAndUpdate(newslots, true);
237 updateLiveTransactionsAndStatus();
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
244 int64_t Table::getArbitrator(IoTString *key) {
245 return arbitratorTable->get(key);
248 void Table::close() {
252 IoTString *Table::getCommitted(IoTString *key) {
253 KeyValue *kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getSpeculative(IoTString *key) {
263 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
266 kv = speculatedKeyValueTable->get(key);
270 kv = committedKeyValueTable->get(key);
274 return kv->getValue();
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281 KeyValue *kv = committedKeyValueTable->get(key);
283 if (!arbitratorTable->contains(key)) {
284 throw new Error("Key not Found.");
287 // Make sure new key value pair matches the current arbitrator
288 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289 // TODO: Maybe not throw en error
290 throw new Error("Not all Key Values Match Arbitrator.");
294 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295 return kv->getValue();
297 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303 if (!arbitratorTable->contains(key)) {
304 throw new Error("Key not Found.");
307 // Make sure new key value pair matches the current arbitrator
308 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309 // TODO: Maybe not throw en error
310 throw new Error("Not all Key Values Match Arbitrator.");
313 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
316 kv = speculatedKeyValueTable->get(key);
320 kv = committedKeyValueTable->get(key);
324 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325 return kv->getValue();
327 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
332 bool Table::update() {
334 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335 validateAndUpdate(newSlots, false);
337 updateLiveTransactionsAndStatus();
339 } catch (Exception *e) {
340 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341 while (kit->hasNext()) {
342 int64_t m = kit->next();
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
353 if (!arbitratorTable->contains(keyName)) {
354 // There is already an arbitrator
357 NewKey *newKey = new NewKey(NULL, keyName, machineId);
359 if (sendToServer(newKey)) {
360 // If successfully inserted
366 void Table::startTransaction() {
367 // Create a new transaction, invalidates any old pending transactions.
368 pendingTransactionBuilder = new PendingTransaction(localMachineId);
371 void Table::addKV(IoTString *key, IoTString *value) {
373 // Make sure it is a valid key
374 if (!arbitratorTable->contains(key)) {
375 throw new Error("Key not Found.");
378 // Make sure new key value pair matches the current arbitrator
379 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380 // TODO: Maybe not throw en error
381 throw new Error("Not all Key Values Match Arbitrator.");
384 // Add the key value to this transaction
385 KeyValue *kv = new KeyValue(key, value);
386 pendingTransactionBuilder->addKV(kv);
389 TransactionStatus *Table::commitTransaction() {
390 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391 // transaction with no updates will have no effect on the system
392 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
395 // Set the local transaction sequence number and increment
396 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397 localTransactionSequenceNumber++;
399 // Create the transaction status
400 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
402 // Create the new transaction
403 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404 newTransaction->setTransactionStatus(transactionStatus);
406 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407 // Add it to the queue and invalidate the builder for safety
408 pendingTransactionQueue->add(newTransaction);
410 arbitrateOnLocalTransaction(newTransaction);
411 updateLiveStateFromLocal();
414 pendingTransactionBuilder = new PendingTransaction(localMachineId);
418 } catch (ServerException *e) {
420 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421 uint size = pendingTransactionQueue->size();
423 for (int iter = 0; iter < size; iter++) {
424 Transaction *transaction = pendingTransactionQueue->get(iter);
425 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
427 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428 // Already contacted this client so ignore all attempts to contact this client
429 // to preserve ordering for arbitrator
433 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
435 if (sendReturn.getFirst()) {
436 // Failed to contact over local
437 arbitratorTriedAndFailed->add(transaction->getArbitrator());
439 // Successful contact or should not contact
441 if (sendReturn.getSecond()) {
447 pendingTransactionQueue->setSize(oldindex);
450 updateLiveStateFromLocal();
452 return transactionStatus;
456 * Recalculate the new resize threshold
458 void Table::setResizeThreshold() {
459 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
463 int64_t Table::getLocalSequenceNumber() {
464 return localSequenceNumber;
467 bool Table::sendToServer(NewKey *newKey) {
468 bool fromRetry = false;
470 if (hadPartialSendToServer) {
471 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472 if (newSlots->length() == 0) {
474 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
476 if (sendSlotsReturn.getFirst()) {
477 if (newKey != NULL) {
478 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
483 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484 while (trit->hasNext()) {
485 Transaction *transaction = trit->next();
486 transaction->resetServerFailure();
487 // Update which transactions parts still need to be sent
488 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489 // Add the transaction status to the outstanding list
490 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
492 // Update the transaction status
493 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
495 // Check if all the transaction parts were successfully
496 // sent and if so then remove it from pending
497 if (transaction->didSendAllParts()) {
498 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499 pendingTransactionQueue->remove(transaction);
504 newSlots = sendSlotsReturn.getThird();
505 bool isInserted = false;
506 for (uint si = 0; si < newSlots->length(); si++) {
507 Slot *s = newSlots->get(si);
508 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
514 for (uint si = 0; si < newSlots->length(); si++) {
515 Slot *s = newSlots->get(si);
520 // Process each entry in the slot
521 Vector<Entry *> *ventries = s->getEntries();
522 uint vesize = ventries->size();
523 for (uint vei = 0; vei < vesize; vei++) {
524 Entry *entry = ventries->get(vei);
525 if (entry->getType() == TypeLastMessage) {
526 LastMessage *lastMessage = (LastMessage *)entry;
527 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
536 if (newKey != NULL) {
537 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
542 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543 while (trit->hasNext()) {
544 Transaction *transaction = trit->next();
545 transaction->resetServerFailure();
547 // Update which transactions parts still need to be sent
548 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
550 // Add the transaction status to the outstanding list
551 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
553 // Update the transaction status
554 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
556 // Check if all the transaction parts were successfully sent and if so then remove it from pending
557 if (transaction->didSendAllParts()) {
558 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559 pendingTransactionQueue->remove(transaction);
561 transaction->resetServerFailure();
562 // Set the transaction sequence number back to nothing
563 if (!transaction->didSendAPartToServer()) {
564 transaction->setSequenceNumber(-1);
572 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573 while (trit->hasNext()) {
574 Transaction *transaction = trit->next();
575 transaction->resetServerFailure();
576 // Set the transaction sequence number back to nothing
577 if (!transaction->didSendAPartToServer()) {
578 transaction->setSequenceNumber(-1);
583 if (sendSlotsReturn.getThird()->length() != 0) {
584 // insert into the local block chain
585 validateAndUpdate(sendSlotsReturn.getThird(), true);
589 bool isInserted = false;
590 for (uint si = 0; si < newSlots->length(); si++) {
591 Slot *s = newSlots->get(si);
592 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
598 for (uint si = 0; si < newSlots->length(); si++) {
599 Slot *s = newSlots->get(si);
604 // Process each entry in the slot
605 Vector<Entry *> *entries = s->getEntries();
606 uint eSize = entries->size();
607 for(uint ei=0; ei < eSize; ei++) {
608 Entry * entry = entries->get(ei);
610 if (entry->getType() == TypeLastMessage) {
611 LastMessage *lastMessage = (LastMessage *)entry;
612 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
621 if (newKey != NULL) {
622 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
627 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628 while (trit->hasNext()) {
629 Transaction *transaction = trit->next();
630 transaction->resetServerFailure();
632 // Update which transactions parts still need to be sent
633 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
635 // Add the transaction status to the outstanding list
636 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
638 // Update the transaction status
639 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
641 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642 if (transaction->didSendAllParts()) {
643 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644 pendingTransactionQueue->remove(transaction);
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656 while (trit->hasNext()) {
657 Transaction *transaction = trit->next();
658 transaction->resetServerFailure();
659 // Set the transaction sequence number back to nothing
660 if (!transaction->didSendAPartToServer()) {
661 transaction->setSequenceNumber(-1);
667 // insert into the local block chain
668 validateAndUpdate(newSlots, true);
671 } catch (ServerException *e) {
678 // While we have stuff that needs inserting into the block chain
679 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
683 if (hadPartialSendToServer) {
684 throw new Error("Should Be error free");
689 // If there is a new key with same name then end
690 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
695 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696 localSequenceNumber++;
698 // Try to fill the slot with data
699 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700 bool needsResize = fillSlotsReturn.getFirst();
701 int newSize = fillSlotsReturn.getSecond();
702 bool insertedNewKey = fillSlotsReturn.getThird();
705 // Reset which transaction to send
706 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707 while (trit->hasNext()) {
708 Transaction *transaction = trit->next();
709 transaction->resetNextPartToSend();
711 // Set the transaction sequence number back to nothing
712 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713 transaction->setSequenceNumber(-1);
718 // Clear the sent data since we are trying again
719 pendingSendArbitrationEntriesToDelete->clear();
720 transactionPartsSent->clear();
722 // We needed a resize so try again
723 fillSlot(slot, true, newKey);
726 lastSlotAttemptedToSend = slot;
727 lastIsNewKey = (newKey != NULL);
728 lastInsertedNewKey = insertedNewKey;
729 lastNewSize = newSize;
731 lastTransactionPartsSent = transactionPartsSent->clone();
732 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
734 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
736 if (sendSlotsReturn.getFirst()) {
738 // Did insert into the block chain
740 if (insertedNewKey) {
741 // This slot was what was inserted not a previous slot
743 // New Key was successfully inserted into the block chain so dont want to insert it again
747 // Remove the aborts and commit parts that were sent from the pending to send queue
748 uint size = pendingSendArbitrationRounds->size();
750 for (uint i = 0; i < size; i++) {
751 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752 round->removeParts(pendingSendArbitrationEntriesToDelete);
754 if (!round->isDoneSending()) {
755 // Sent all the parts
756 pendingSendArbitrationRounds->set(oldcount++,
757 pendingSendArbitrationRounds->get(i));
760 pendingSendArbitrationRounds->setSize(oldcount);
762 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763 while (trit->hasNext()) {
764 Transaction *transaction = trit->next();
765 transaction->resetServerFailure();
767 // Update which transactions parts still need to be sent
768 transaction->removeSentParts(transactionPartsSent->get(transaction));
770 // Add the transaction status to the outstanding list
771 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
773 // Update the transaction status
774 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
776 // Check if all the transaction parts were successfully sent and if so then remove it from pending
777 if (transaction->didSendAllParts()) {
778 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779 pendingTransactionQueue->remove(transaction);
784 // Reset which transaction to send
785 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786 while (trit->hasNext()) {
787 Transaction *transaction = trit->next();
788 transaction->resetNextPartToSend();
790 // Set the transaction sequence number back to nothing
791 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792 transaction->setSequenceNumber(-1);
798 // Clear the sent data in preparation for next send
799 pendingSendArbitrationEntriesToDelete->clear();
800 transactionPartsSent->clear();
802 if (sendSlotsReturn.getThird()->length() != 0) {
803 // insert into the local block chain
804 validateAndUpdate(sendSlotsReturn.getThird(), true);
808 } catch (ServerException *e) {
809 if (e->getType() != ServerException_TypeInputTimeout) {
810 // Nothing was able to be sent to the server so just clear these data structures
811 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812 while (trit->hasNext()) {
813 Transaction *transaction = trit->next();
814 transaction->resetNextPartToSend();
816 // Set the transaction sequence number back to nothing
817 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818 transaction->setSequenceNumber(-1);
823 // There was a partial send to the server
824 hadPartialSendToServer = true;
826 // Nothing was able to be sent to the server so just clear these data structures
827 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828 while (trit->hasNext()) {
829 Transaction *transaction = trit->next();
830 transaction->resetNextPartToSend();
831 transaction->setServerFailure();
836 pendingSendArbitrationEntriesToDelete->clear();
837 transactionPartsSent->clear();
842 return newKey == NULL;
845 bool Table::updateFromLocal(int64_t machineId) {
846 if (!localCommunicationTable->contains(machineId))
849 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
851 // Get the size of the send data
852 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
859 Array<char> *sendData = new Array<char>(sendDataSize);
860 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
863 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
867 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868 localSequenceNumber++;
870 if (returnData == NULL) {
871 // Could not contact server
876 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877 int numberOfEntries = bbDecode->getInt();
879 for (int i = 0; i < numberOfEntries; i++) {
880 char type = bbDecode->get();
881 if (type == TypeAbort) {
882 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
884 } else if (type == TypeCommitPart) {
885 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886 processEntry(commitPart);
890 updateLiveStateFromLocal();
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
897 // Get the devices local communications
898 if (!localCommunicationTable->contains(transaction->getArbitrator()))
899 return Pair<bool, bool>(true, false);
901 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
903 // Get the size of the send data
904 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
906 Vector<TransactionPart *> * tParts = transaction->getParts();
907 uint tPartsSize = tParts->size();
908 for (uint i = 0; i < tPartsSize; i++) {
909 TransactionPart * part = tParts->get(i);
910 sendDataSize += part->getSize();
914 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
919 // Make the send data size
920 Array<char> *sendData = new Array<char>(sendDataSize);
921 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
924 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925 bbEncode->putInt(transaction->getParts()->size());
927 Vector<TransactionPart *> * tParts = transaction->getParts();
928 uint tPartsSize = tParts->size();
929 for (uint i = 0; i < tPartsSize; i++) {
930 TransactionPart * part = tParts->get(i);
931 part->encode(bbEncode);
936 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937 localSequenceNumber++;
939 if (returnData == NULL) {
940 // Could not contact server
941 return Pair<bool, bool>(true, false);
945 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946 bool didCommit = bbDecode->get() == 1;
947 bool couldArbitrate = bbDecode->get() == 1;
948 int numberOfEntries = bbDecode->getInt();
949 bool foundAbort = false;
951 for (int i = 0; i < numberOfEntries; i++) {
952 char type = bbDecode->get();
953 if (type == TypeAbort) {
954 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
956 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
961 } else if (type == TypeCommitPart) {
962 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963 processEntry(commitPart);
967 updateLiveStateFromLocal();
969 if (couldArbitrate) {
970 TransactionStatus * status = transaction->getTransactionStatus();
972 status->setStatus(TransactionStatus_StatusCommitted);
974 status->setStatus(TransactionStatus_StatusAborted);
977 TransactionStatus * status = transaction->getTransactionStatus();
979 status->setStatus(TransactionStatus_StatusAborted);
981 status->setStatus(TransactionStatus_StatusCommitted);
985 return Pair<bool, bool>(false, true);
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
990 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992 int numberOfParts = bbDecode->getInt();
994 // If we did commit a transaction or not
995 bool didCommit = false;
996 bool couldArbitrate = false;
998 if (numberOfParts != 0) {
1000 // decode the transaction
1001 Transaction *transaction = new Transaction();
1002 for (int i = 0; i < numberOfParts; i++) {
1004 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005 transaction->addPartDecode(newPart);
1008 // Arbitrate on transaction and pull relevant return data
1009 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010 couldArbitrate = localArbitrateReturn.getFirst();
1011 didCommit = localArbitrateReturn.getSecond();
1013 updateLiveStateFromLocal();
1015 // Transaction was sent to the server so keep track of it to prevent double commit
1016 if (transaction->getSequenceNumber() != -1) {
1017 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1021 // The data to send back
1022 int returnDataSize = 0;
1023 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1025 // Get the aborts to send back
1026 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1028 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029 while(abortit->hasNext())
1030 abortLocalSequenceNumbers->add(abortit->next());
1034 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1036 uint asize = abortLocalSequenceNumbers->size();
1037 for(uint i=0; i<asize; i++) {
1038 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1043 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044 unseenArbitrations->add(abort);
1045 returnDataSize += abort->getSize();
1048 // Get the commits to send back
1049 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050 if (commitForClientTable != NULL) {
1051 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1053 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054 while(commitit->hasNext())
1055 commitLocalSequenceNumbers->add(commitit->next());
1058 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1060 uint clsSize = commitLocalSequenceNumbers->size();
1061 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063 Commit *commit = commitForClientTable->get(localSequenceNumber);
1065 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1070 Vector<CommitPart *> * parts = commit->getParts();
1071 uint nParts = parts->size();
1072 for(uint i=0; i<nParts; i++) {
1073 CommitPart * commitPart = parts->get(i);
1074 unseenArbitrations->add(commitPart);
1075 returnDataSize += commitPart->getSize();
1081 // Number of arbitration entries to decode
1082 returnDataSize += 2 * sizeof(int32_t);
1084 // bool of did commit or not
1085 if (numberOfParts != 0) {
1086 returnDataSize += sizeof(char);
1089 // Data to send Back
1090 Array<char> *returnData = new Array<char>(returnDataSize);
1091 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1093 if (numberOfParts != 0) {
1095 bbEncode->put((char)1);
1097 bbEncode->put((char)0);
1099 if (couldArbitrate) {
1100 bbEncode->put((char)1);
1102 bbEncode->put((char)0);
1106 bbEncode->putInt(unseenArbitrations->size());
1107 uint size = unseenArbitrations->size();
1108 for (uint i = 0; i < size; i++) {
1109 Entry *entry = unseenArbitrations->get(i);
1110 entry->encode(bbEncode);
1113 localSequenceNumber++;
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119 attemptedToSendToServer = true;
1121 bool inserted = false;
1122 bool lastTryInserted = false;
1124 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125 if (array == NULL) {
1126 array = new Array<Slot *>();
1127 array->set(0, slot);
1128 rejectedSlotVector->clear();
1131 if (array->length() == 0) {
1132 throw new Error("Server Error: Did not send any slots");
1135 // if (attemptedToSendToServerTmp) {
1136 if (hadPartialSendToServer) {
1138 bool isInserted = false;
1139 uint size = array->length();
1140 for (uint i = 0; i < size; i++) {
1141 Slot *s = array->get(i);
1142 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1148 for (uint i = 0; i < size; i++) {
1149 Slot *s = array->get(i);
1154 // Process each entry in the slot
1155 Vector<Entry *> *entries = s->getEntries();
1156 uint eSize = entries->size();
1157 for(uint ei=0; ei < eSize; ei++) {
1158 Entry * entry = entries->get(ei);
1160 if (entry->getType() == TypeLastMessage) {
1161 LastMessage *lastMessage = (LastMessage *)entry;
1163 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1172 rejectedSlotVector->add(slot->getSequenceNumber());
1173 lastTryInserted = false;
1175 lastTryInserted = true;
1178 rejectedSlotVector->add(slot->getSequenceNumber());
1179 lastTryInserted = false;
1183 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1187 * Returns false if a resize was needed
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1191 if (liveSlotCount > bufferResizeThreshold) {
1192 resize = true;//Resize is forced
1196 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197 TableStatus *status = new TableStatus(slot, newSize);
1198 slot->addEntry(status);
1201 // Fill with rejected slots first before doing anything else
1202 doRejectedMessages(slot);
1204 // Do mandatory rescue of entries
1205 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1207 // Extract working variables
1208 bool needsResize = mandatoryRescueReturn.getFirst();
1209 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1212 if (needsResize && !resize) {
1213 // We need to resize but we are not resizing so return false
1214 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1217 bool inserted = false;
1218 if (newKeyEntry != NULL) {
1219 newKeyEntry->setSlot(slot);
1220 if (slot->hasSpace(newKeyEntry)) {
1221 slot->addEntry(newKeyEntry);
1226 // Clear the transactions, aborts and commits that were sent previously
1227 transactionPartsSent->clear();
1228 pendingSendArbitrationEntriesToDelete->clear();
1229 uint size = pendingSendArbitrationRounds->size();
1230 for (uint i = 0; i < size; i++) {
1231 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232 bool isFull = false;
1233 round->generateParts();
1234 Vector<Entry *> *parts = round->getParts();
1236 // Insert pending arbitration data
1237 uint vsize = parts->size();
1238 for (uint vi = 0; vi < vsize; vi++) {
1239 Entry *arbitrationData = parts->get(vi);
1241 // If it is an abort then we need to set some information
1242 if (arbitrationData->getType() == TypeAbort) {
1243 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1246 if (!slot->hasSpace(arbitrationData)) {
1247 // No space so cant do anything else with these data entries
1252 // Add to this current slot and add it to entries to delete
1253 slot->addEntry(arbitrationData);
1254 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1262 if (pendingTransactionQueue->size() > 0) {
1263 Transaction *transaction = pendingTransactionQueue->get(0);
1264 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266 transaction->setSequenceNumber(slot->getSequenceNumber());
1270 TransactionPart *part = transaction->getNextPartToSend();
1272 // Ran out of parts to send for this transaction so move on
1276 if (slot->hasSpace(part)) {
1277 slot->addEntry(part);
1278 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279 if (partsSent == NULL) {
1280 partsSent = new Vector<int32_t>();
1281 transactionPartsSent->put(transaction, partsSent);
1283 partsSent->add(part->getPartNumber());
1284 transactionPartsSent->put(transaction, partsSent);
1291 // Fill the remainder of the slot with rescue data
1292 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1294 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1297 void Table::doRejectedMessages(Slot *s) {
1298 if (!rejectedSlotVector->isEmpty()) {
1299 /* TODO: We should avoid generating a rejected message entry if
1300 * there is already a sufficient entry in the queue (e->g->,
1301 * equalsto value of true and same sequence number)-> */
1303 int64_t old_seqn = rejectedSlotVector->get(0);
1304 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305 int64_t new_seqn = rejectedSlotVector->lastElement();
1306 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1309 int64_t prev_seqn = -1;
1311 /* Go through list of missing messages */
1312 for (; i < rejectedSlotVector->size(); i++) {
1313 int64_t curr_seqn = rejectedSlotVector->get(i);
1314 Slot *s_msg = buffer->getSlot(curr_seqn);
1317 prev_seqn = curr_seqn;
1319 /* Generate rejected message entry for missing messages */
1320 if (prev_seqn != -1) {
1321 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1324 /* Generate rejected message entries for present messages */
1325 for (; i < rejectedSlotVector->size(); i++) {
1326 int64_t curr_seqn = rejectedSlotVector->get(i);
1327 Slot *s_msg = buffer->getSlot(curr_seqn);
1328 int64_t machineid = s_msg->getMachineID();
1329 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1343 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344 bool seenLiveSlot = false;
1345 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1346 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1350 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352 // Push slot number forward
1353 if (!seenLiveSlot) {
1354 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1357 if (!previousSlot->isLive()) {
1361 // We have seen a live slot
1362 seenLiveSlot = true;
1364 // Get all the live entries for a slot
1365 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1367 // Iterate over all the live entries and try to rescue them
1368 uint lESize = liveEntries->size();
1369 for (uint i=0; i< lESize; i++) {
1370 Entry * liveEntry = liveEntries->get(i);
1371 if (slot->hasSpace(liveEntry)) {
1372 // Enough space to rescue the entry
1373 slot->addEntry(liveEntry);
1374 } else if (currentSequenceNumber == firstIfFull) {
1375 //if there's no space but the entry is about to fall off the queue
1376 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1382 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386 /* now go through live entries from least to greatest sequence number until
1387 * either all live slots added, or the slot doesn't have enough room
1388 * for SKIP_THRESHOLD consecutive entries*/
1390 int64_t newestseqnum = buffer->getNewestSeqNum();
1392 for (; seqn <= newestseqnum; seqn++) {
1393 Slot *prevslot = buffer->getSlot(seqn);
1394 //Push slot number forward
1396 oldestLiveSlotSequenceNumver = seqn;
1398 if (!prevslot->isLive())
1400 seenliveslot = true;
1401 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402 uint lESize = liveentries->size();
1403 for (uint i=0; i< lESize; i++) {
1404 Entry * liveentry = liveentries->get(i);
1405 if (s->hasSpace(liveentry))
1406 s->addEntry(liveentry);
1409 if (skipcount > Table_SKIP_THRESHOLD)
1419 * Checks for malicious activity and updates the local copy of the block chain->
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422 // The cloud communication layer has checked slot HMACs already
1424 if (newSlots->length() == 0) {
1428 // Make sure all slots are newer than the last largest slot this
1430 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431 if (firstSeqNum <= sequenceNumber) {
1432 throw new Error("Server Error: Sent older slots!");
1435 // Create an object that can access both new slots and slots in our
1436 // local chain without committing slots to our local chain
1437 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1439 // Check that the HMAC chain is not broken
1440 checkHMACChain(indexer, newSlots);
1442 // Set to keep track of messages from clients
1443 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1445 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446 while(lmit->hasNext())
1447 machineSet->add(lmit->next());
1451 // Process each slots data
1453 uint numSlots = newSlots->length();
1454 for(uint i=0; i<numSlots; i++) {
1455 Slot *slot = newSlots->get(i);
1456 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457 updateExpectedSize();
1461 // If there is a gap, check to see if the server sent us
1463 if (firstSeqNum != (sequenceNumber + 1)) {
1465 // Check the size of the slots that were sent down by the server->
1466 // Can only check the size if there was a gap
1467 checkNumSlots(newSlots->length());
1469 // Since there was a gap every machine must have pushed a slot or
1470 // must have a last message message-> If not then the server is
1472 if (!machineSet->isEmpty()) {
1473 throw new Error("Missing record for machines: ");
1477 // Update the size of our local block chain->
1480 // Commit new to slots to the local block chain->
1482 uint numSlots = newSlots->length();
1483 for(uint i=0; i<numSlots; i++) {
1484 Slot *slot = newSlots->get(i);
1486 // Insert this slot into our local block chain copy->
1487 buffer->putSlot(slot);
1489 // Keep track of how many slots are currently live (have live data
1494 // Get the sequence number of the latest slot in the system
1495 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496 updateLiveStateFromServer();
1498 // No Need to remember after we pulled from the server
1499 offlineTransactionsCommittedAndAtServer->clear();
1501 // This is invalidated now
1502 hadPartialSendToServer = false;
1505 void Table::updateLiveStateFromServer() {
1506 // Process the new transaction parts
1507 processNewTransactionParts();
1509 // Do arbitration on new transactions that were received
1510 arbitrateFromServer();
1512 // Update all the committed keys
1513 bool didCommitOrSpeculate = updateCommittedTable();
1515 // Delete the transactions that are now dead
1516 updateLiveTransactionsAndStatus();
1519 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1523 void Table::updateLiveStateFromLocal() {
1524 // Update all the committed keys
1525 bool didCommitOrSpeculate = updateCommittedTable();
1527 // Delete the transactions that are now dead
1528 updateLiveTransactionsAndStatus();
1531 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536 int64_t prevslots = firstSequenceNumber;
1538 if (didFindTableStatus) {
1540 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1543 didFindTableStatus = true;
1544 currMaxSize = numberOfSlots;
1547 void Table::updateExpectedSize() {
1550 if (expectedsize > currMaxSize) {
1551 expectedsize = currMaxSize;
1557 * Check the size of the block chain to make sure there are enough
1558 * slots sent back by the server-> This is only called when we have a
1559 * gap between the slots that we have locally and the slots sent by
1560 * the server therefore in the slots sent by the server there will be
1561 * at least 1 Table status message
1563 void Table::checkNumSlots(int numberOfSlots) {
1564 if (numberOfSlots != expectedsize) {
1565 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1570 * Update the size of of the local buffer if it is needed->
1572 void Table::commitNewMaxSize() {
1573 didFindTableStatus = false;
1575 // Resize the local slot buffer
1576 if (numberOfSlots != currMaxSize) {
1577 buffer->resize((int32_t)currMaxSize);
1580 // Change the number of local slots to the new size
1581 numberOfSlots = (int32_t)currMaxSize;
1583 // Recalculate the resize threshold since the size of the local
1584 // buffer has changed
1585 setResizeThreshold();
1589 * Process the new transaction parts from this latest round of slots
1590 * received from the server
1592 void Table::processNewTransactionParts() {
1594 if (newTransactionParts->size() == 0) {
1595 // Nothing new to process
1599 // Iterate through all the machine Ids that we received new parts
1601 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602 while(tpit->hasNext()) {
1603 int64_t machineId = tpit->next();
1604 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1606 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607 // Iterate through all the parts for that machine Id
1608 while(ptit->hasNext()) {
1609 Pair<int64_t, int32_t> * partId = ptit->next();
1610 TransactionPart *part = parts->get(partId);
1612 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615 // Set dead the transaction part
1621 // Get the transaction object for that sequence number
1622 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1624 if (transaction == NULL) {
1625 // This is a new transaction that we dont have so make a new one
1626 transaction = new Transaction();
1628 // Insert this new transaction into the live tables
1629 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1633 // Add that part to the transaction
1634 transaction->addPartDecode(part);
1639 // Clear all the new transaction parts in preparation for the next
1640 // time the server sends slots
1641 newTransactionParts->clear();
1644 void Table::arbitrateFromServer() {
1646 if (liveTransactionBySequenceNumberTable->size() == 0) {
1647 // Nothing to arbitrate on so move on
1651 // Get the transaction sequence numbers and sort from oldest to newest
1652 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1654 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1655 while(trit->hasNext())
1656 transactionSequenceNumbers->add(trit->next());
1659 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1661 // Collection of key value pairs that are
1662 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1664 // The last transaction arbitrated on
1665 int64_t lastTransactionCommitted = -1;
1666 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1667 uint tsnSize = transactionSequenceNumbers->size();
1668 for(uint i=0; i<tsnSize; i++) {
1669 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1670 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1672 // Check if this machine arbitrates for this transaction if not
1673 // then we cant arbitrate this transaction
1674 if (transaction->getArbitrator() != localMachineId) {
1678 if (transactionSequenceNumber < lastSeqNumArbOn) {
1682 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1683 // We have seen this already locally so dont commit again
1688 if (!transaction->isComplete()) {
1689 // Will arbitrate in incorrect order if we continue so just break
1695 // update the largest transaction seen by arbitrator from server
1696 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1697 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1699 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1700 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1701 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1705 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1706 // Guard evaluated as true
1708 // Update the local changes so we can make the commit
1709 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1710 while (kvit->hasNext()) {
1711 KeyValue *kv = kvit->next();
1712 speculativeTableTmp->put(kv->getKey(), kv);
1716 // Update what the last transaction committed was for use in batch commit
1717 lastTransactionCommitted = transactionSequenceNumber;
1719 // Guard evaluated was false so create abort
1721 Abort *newAbort = new Abort(NULL,
1722 transaction->getClientLocalSequenceNumber(),
1723 transaction->getSequenceNumber(),
1724 transaction->getMachineId(),
1725 transaction->getArbitrator(),
1726 localArbitrationSequenceNumber);
1727 localArbitrationSequenceNumber++;
1728 generatedAborts->add(newAbort);
1730 // Insert the abort so we can process
1731 processEntry(newAbort);
1734 lastSeqNumArbOn = transactionSequenceNumber;
1737 Commit *newCommit = NULL;
1739 // If there is something to commit
1740 if (speculativeTableTmp->size() != 0) {
1741 // Create the commit and increment the commit sequence number
1742 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1743 localArbitrationSequenceNumber++;
1745 // Add all the new keys to the commit
1746 SetIterator<IoTString *, KeyValue *> * spit = getKeyIterator(speculativeTableTmp);
1747 while(spit->hasNext()) {
1748 IoTString * string = spit->next();
1749 KeyValue * kv = speculativeTableTmp->get(string);
1750 newCommit->addKV(kv);
1754 // create the commit parts
1755 newCommit->createCommitParts();
1757 // Append all the commit parts to the end of the pending queue
1758 // waiting for sending to the server
1759 // Insert the commit so we can process it
1760 Vector<CommitPart *> * parts = newCommit->getParts();
1761 uint partsSize = parts->size();
1762 for(uint i=0; i<partsSize; i++) {
1763 CommitPart * commitPart = parts->get(i);
1764 processEntry(commitPart);
1768 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1769 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1770 pendingSendArbitrationRounds->add(arbitrationRound);
1772 if (compactArbitrationData()) {
1773 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1774 if (newArbitrationRound->getCommit() != NULL) {
1775 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1776 uint partsSize = parts->size();
1777 for(uint i=0; i<partsSize; i++) {
1778 CommitPart * commitPart = parts->get(i);
1779 processEntry(commitPart);
1786 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1788 // Check if this machine arbitrates for this transaction if not then
1789 // we cant arbitrate this transaction
1790 if (transaction->getArbitrator() != localMachineId) {
1791 return Pair<bool, bool>(false, false);
1794 if (!transaction->isComplete()) {
1795 // Will arbitrate in incorrect order if we continue so just break
1797 return Pair<bool, bool>(false, false);
1800 if (transaction->getMachineId() != localMachineId) {
1801 // dont do this check for local transactions
1802 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1803 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1804 // We've have already seen this from the server
1805 return Pair<bool, bool>(false, false);
1810 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1811 // Guard evaluated as true Create the commit and increment the
1812 // commit sequence number
1813 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1814 localArbitrationSequenceNumber++;
1816 // Update the local changes so we can make the commit
1817 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1818 while (kvit->hasNext()) {
1819 KeyValue *kv = kvit->next();
1820 newCommit->addKV(kv);
1824 // create the commit parts
1825 newCommit->createCommitParts();
1827 // Append all the commit parts to the end of the pending queue
1828 // waiting for sending to the server
1829 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1830 pendingSendArbitrationRounds->add(arbitrationRound);
1832 if (compactArbitrationData()) {
1833 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1834 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1835 uint partsSize = parts->size();
1836 for(uint i=0; i<partsSize; i++) {
1837 CommitPart * commitPart = parts->get(i);
1838 processEntry(commitPart);
1841 // Insert the commit so we can process it
1842 Vector<CommitPart *> * parts = newCommit->getParts();
1843 uint partsSize = parts->size();
1844 for(uint i=0; i<partsSize; i++) {
1845 CommitPart * commitPart = parts->get(i);
1846 processEntry(commitPart);
1850 if (transaction->getMachineId() == localMachineId) {
1851 TransactionStatus *status = transaction->getTransactionStatus();
1852 if (status != NULL) {
1853 status->setStatus(TransactionStatus_StatusCommitted);
1857 updateLiveStateFromLocal();
1858 return Pair<bool, bool>(true, true);
1860 if (transaction->getMachineId() == localMachineId) {
1861 // For locally created messages update the status
1862 // Guard evaluated was false so create abort
1863 TransactionStatus * status = transaction->getTransactionStatus();
1864 if (status != NULL) {
1865 status->setStatus(TransactionStatus_StatusAborted);
1868 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1871 Abort *newAbort = new Abort(NULL,
1872 transaction->getClientLocalSequenceNumber(),
1874 transaction->getMachineId(),
1875 transaction->getArbitrator(),
1876 localArbitrationSequenceNumber);
1877 localArbitrationSequenceNumber++;
1878 addAbortSet->add(newAbort);
1880 // Append all the commit parts to the end of the pending queue
1881 // waiting for sending to the server
1882 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1883 pendingSendArbitrationRounds->add(arbitrationRound);
1885 if (compactArbitrationData()) {
1886 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1888 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1889 uint partsSize = parts->size();
1890 for(uint i=0; i<partsSize; i++) {
1891 CommitPart * commitPart = parts->get(i);
1892 processEntry(commitPart);
1897 updateLiveStateFromLocal();
1898 return Pair<bool, bool>(true, false);
1903 * Compacts the arbitration data my merging commits and aggregating
1904 * aborts so that a single large push of commits can be done instead
1905 * of many small updates
1907 bool Table::compactArbitrationData() {
1908 if (pendingSendArbitrationRounds->size() < 2) {
1909 // Nothing to compact so do nothing
1913 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1914 if (lastRound->getDidSendPart()) {
1918 bool hadCommit = (lastRound->getCommit() == NULL);
1919 bool gotNewCommit = false;
1921 int numberToDelete = 1;
1922 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1923 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1925 if (round->isFull() || round->getDidSendPart()) {
1926 // Stop since there is a part that cannot be compacted and we
1927 // need to compact in order
1931 if (round->getCommit() == NULL) {
1932 // Try compacting aborts only
1933 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1934 if (newSize > ArbitrationRound_MAX_PARTS) {
1935 // Cant compact since it would be too large
1938 lastRound->addAborts(round->getAborts());
1940 // Create a new larger commit
1941 Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1942 localArbitrationSequenceNumber++;
1944 // Create the commit parts so that we can count them
1945 newCommit->createCommitParts();
1947 // Calculate the new size of the parts
1948 int newSize = newCommit->getNumberOfParts();
1949 newSize += lastRound->getAbortsCount();
1950 newSize += round->getAbortsCount();
1952 if (newSize > ArbitrationRound_MAX_PARTS) {
1953 // Cant compact since it would be too large
1957 // Set the new compacted part
1958 lastRound->setCommit(newCommit);
1959 lastRound->addAborts(round->getAborts());
1960 gotNewCommit = true;
1966 if (numberToDelete != 1) {
1967 // If there is a compaction
1968 // Delete the previous pieces that are now in the new compacted piece
1969 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1970 pendingSendArbitrationRounds->clear();
1972 for (int i = 0; i < numberToDelete; i++) {
1973 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1977 // Add the new compacted into the pending to send list
1978 pendingSendArbitrationRounds->add(lastRound);
1980 // Should reinsert into the commit processor
1981 if (hadCommit && gotNewCommit) {
1990 * Update all the commits and the committed tables, sets dead the dead
1993 bool Table::updateCommittedTable() {
1995 if (newCommitParts->size() == 0) {
1996 // Nothing new to process
2000 // Iterate through all the machine Ids that we received new parts for
2001 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts);
2002 while(partsit->hasNext()) {
2003 int64_t machineId = partsit->next();
2004 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2006 // Iterate through all the parts for that machine Id
2007 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts);
2008 while(pairit->hasNext()) {
2009 Pair<int64_t, int32_t> * partId = pairit->next();
2010 CommitPart *part = parts->get(partId);
2012 // Get the transaction object for that sequence number
2013 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2015 if (commitForClientTable == NULL) {
2016 // This is the first commit from this device
2017 commitForClientTable = new Hashtable<int64_t, Commit *>();
2018 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2021 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2023 if (commit == NULL) {
2024 // This is a new commit that we dont have so make a new one
2025 commit = new Commit();
2027 // Insert this new commit into the live tables
2028 commitForClientTable->put(part->getSequenceNumber(), commit);
2031 // Add that part to the commit
2032 commit->addPartDecode(part);
2038 // Clear all the new commits parts in preparation for the next time
2039 // the server sends slots
2040 newCommitParts->clear();
2042 // If we process a new commit keep track of it for future use
2043 bool didProcessANewCommit = false;
2045 // Process the commits one by one
2046 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> * liveit = getKeyIterator(liveCommitsTable);
2047 while (liveit->hasNext()) {
2048 int64_t arbitratorId = liveit->next();
2050 // Get all the commits for a specific arbitrator
2051 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2053 // Sort the commits in order
2054 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2056 SetIterator<int64_t, Commit *> * clientit = getKeyIterator(commitForClientTable);
2057 while(clientit->hasNext())
2058 commitSequenceNumbers->add(clientit->next());
2062 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2064 // Get the last commit seen from this arbitrator
2065 int64_t lastCommitSeenSequenceNumber = -1;
2066 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2067 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2070 // Go through each new commit one by one
2071 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2072 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2073 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2075 // Special processing if a commit is not complete
2076 if (!commit->isComplete()) {
2077 if (i == (commitSequenceNumbers->size() - 1)) {
2078 // If there is an incomplete commit and this commit is the
2079 // latest one seen then this commit cannot be processed and
2080 // there are no other commits
2083 // This is a commit that was already dead but parts of it
2084 // are still in the block chain (not flushed out yet)->
2085 // Delete it and move on
2087 commitForClientTable->remove(commit->getSequenceNumber());
2092 // Update the last transaction that was updated if we can
2093 if (commit->getTransactionSequenceNumber() != -1) {
2094 // Update the last transaction sequence number that the arbitrator arbitrated on1
2095 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2096 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2100 // Update the last arbitration data that we have seen so far
2101 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2102 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2103 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2105 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2108 // Never seen any data from this arbitrator so record the first one
2109 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2112 // We have already seen this commit before so need to do the
2113 // full processing on this commit
2114 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2116 // Update the last transaction that was updated if we can
2117 if (commit->getTransactionSequenceNumber() != -1) {
2118 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2119 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2120 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2121 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2128 // If we got here then this is a brand new commit and needs full
2130 // Get what commits should be edited, these are the commits that
2131 // have live values for their keys
2132 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2134 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2135 while (kvit->hasNext()) {
2136 KeyValue *kv = kvit->next();
2137 Commit * commit = liveCommitsByKeyTable->get(kv->getKey());
2139 commitsToEdit->add(commit);
2144 // Update each previous commit that needs to be updated
2145 SetIterator<Commit *, Commit *> * commitit = commitsToEdit->iterator();
2146 while(commitit->hasNext()) {
2147 Commit *previousCommit = commitit->next();
2149 // Only bother with live commits (TODO: Maybe remove this check)
2150 if (previousCommit->isLive()) {
2152 // Update which keys in the old commits are still live
2154 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2155 while (kvit->hasNext()) {
2156 KeyValue *kv = kvit->next();
2157 previousCommit->invalidateKey(kv->getKey());
2162 // if the commit is now dead then remove it
2163 if (!previousCommit->isLive()) {
2164 commitForClientTable->remove(previousCommit->getSequenceNumber());
2170 // Update the last seen sequence number from this arbitrator
2171 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2172 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2173 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2176 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2179 // We processed a new commit that we havent seen before
2180 didProcessANewCommit = true;
2182 // Update the committed table of keys and which commit is using which key
2184 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2185 while (kvit->hasNext()) {
2186 KeyValue *kv = kvit->next();
2187 committedKeyValueTable->put(kv->getKey(), kv);
2188 liveCommitsByKeyTable->put(kv->getKey(), commit);
2196 return didProcessANewCommit;
2200 * Create the speculative table from transactions that are still live
2201 * and have come from the cloud
2203 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2204 if (liveTransactionBySequenceNumberTable->size() == 0) {
2205 // There is nothing to speculate on
2209 // Create a list of the transaction sequence numbers and sort them
2210 // from oldest to newest
2211 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2213 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2214 while(trit->hasNext())
2215 transactionSequenceNumbersSorted->add(trit->next());
2219 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2221 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2224 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2225 // If there is a gap in the transaction sequence numbers then
2226 // there was a commit or an abort of a transaction OR there was a
2227 // new commit (Could be from offline commit) so a redo the
2228 // speculation from scratch
2230 // Start from scratch
2231 speculatedKeyValueTable->clear();
2232 lastTransactionSequenceNumberSpeculatedOn = -1;
2233 oldestTransactionSequenceNumberSpeculatedOn = -1;
2236 // Remember the front of the transaction list
2237 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2239 // Find where to start arbitration from
2242 for(; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2243 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2247 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2248 // Make sure we are not out of bounds
2249 return false; // did not speculate
2252 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2253 bool didSkip = true;
2255 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2256 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2257 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2259 if (!transaction->isComplete()) {
2260 // If there is an incomplete transaction then there is nothing
2261 // we can do add this transactions arbitrator to the list of
2262 // arbitrators we should ignore
2263 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2268 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2272 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2274 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2275 // Guard evaluated to true so update the speculative table
2277 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2278 while (kvit->hasNext()) {
2279 KeyValue *kv = kvit->next();
2280 speculatedKeyValueTable->put(kv->getKey(), kv);
2288 // Since there was a skip we need to redo the speculation next time around
2289 lastTransactionSequenceNumberSpeculatedOn = -1;
2290 oldestTransactionSequenceNumberSpeculatedOn = -1;
2293 // We did some speculation
2298 * Create the pending transaction speculative table from transactions
2299 * that are still in the pending transaction buffer
2301 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2302 if (pendingTransactionQueue->size() == 0) {
2303 // There is nothing to speculate on
2307 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2308 // need to reset on the pending speculation
2309 lastPendingTransactionSpeculatedOn = NULL;
2310 firstPendingTransaction = pendingTransactionQueue->get(0);
2311 pendingTransactionSpeculatedKeyValueTable->clear();
2314 // Find where to start arbitration from
2317 for(; startIndex < pendingTransactionQueue->size(); startIndex++)
2318 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2321 if (startIndex >= pendingTransactionQueue->size()) {
2322 // Make sure we are not out of bounds
2326 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2327 Transaction *transaction = pendingTransactionQueue->get(i);
2329 lastPendingTransactionSpeculatedOn = transaction;
2331 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2332 // Guard evaluated to true so update the speculative table
2333 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2334 while (kvit->hasNext()) {
2335 KeyValue *kv = kvit->next();
2336 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2344 * Set dead and remove from the live transaction tables the
2345 * transactions that are dead
2347 void Table::updateLiveTransactionsAndStatus() {
2348 // Go through each of the transactions
2350 SetIterator<int64_t, Transaction *> * iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2351 while(iter->hasNext()) {
2352 int64_t key = iter->next();
2353 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2355 // Check if the transaction is dead
2356 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2357 // Set dead the transaction
2358 transaction->setDead();
2360 // Remove the transaction from the live table
2362 liveTransactionByTransactionIdTable->remove(transaction->getId());
2368 // Go through each of the transactions
2370 SetIterator<int64_t, TransactionStatus *> * iter = getKeyIterator(outstandingTransactionStatus);
2371 while(iter->hasNext()) {
2372 int64_t key = iter->next();
2373 TransactionStatus *status = outstandingTransactionStatus->get(key);
2375 // Check if the transaction is dead
2376 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2379 status->setStatus(TransactionStatus_StatusCommitted);
2390 * Process this slot, entry by entry-> Also update the latest message sent by slot
2392 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2394 // Update the last message seen
2395 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2397 // Process each entry in the slot
2398 Vector<Entry *> *entries = slot->getEntries();
2399 uint eSize = entries->size();
2400 for(uint ei=0; ei < eSize; ei++) {
2401 Entry * entry = entries->get(ei);
2402 switch (entry->getType()) {
2403 case TypeCommitPart:
2404 processEntry((CommitPart *)entry);
2407 processEntry((Abort *)entry);
2409 case TypeTransactionPart:
2410 processEntry((TransactionPart *)entry);
2413 processEntry((NewKey *)entry);
2415 case TypeLastMessage:
2416 processEntry((LastMessage *)entry, machineSet);
2418 case TypeRejectedMessage:
2419 processEntry((RejectedMessage *)entry, indexer);
2421 case TypeTableStatus:
2422 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2425 throw new Error("Unrecognized type: ");
2431 * Update the last message that was sent for a machine Id
2433 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2434 // Update what the last message received by a machine was
2435 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2439 * Add the new key to the arbitrators table and update the set of live
2440 * new keys (in case of a rescued new key message)
2442 void Table::processEntry(NewKey *entry) {
2443 // Update the arbitrator table with the new key information
2444 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2446 // Update what the latest live new key is
2447 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2448 if (oldNewKey != NULL) {
2449 // Delete the old new key messages
2450 oldNewKey->setDead();
2455 * Process new table status entries and set dead the old ones as new
2456 * ones come in-> keeps track of the largest and smallest table status
2457 * seen in this current round of updating the local copy of the block
2460 void Table::processEntry(TableStatus * entry, int64_t seq) {
2461 int newNumSlots = entry->getMaxSlots();
2462 updateCurrMaxSize(newNumSlots);
2463 initExpectedSize(seq, newNumSlots);
2465 if (liveTableStatus != NULL) {
2466 // We have a larger table status so the old table status is no
2468 liveTableStatus->setDead();
2471 // Make this new table status the latest alive table status
2472 liveTableStatus = entry;
2476 * Check old messages to see if there is a block chain violation->
2479 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2480 int64_t oldSeqNum = entry->getOldSeqNum();
2481 int64_t newSeqNum = entry->getNewSeqNum();
2482 bool isequal = entry->getEqual();
2483 int64_t machineId = entry->getMachineID();
2484 int64_t seq = entry->getSequenceNumber();
2486 // Check if we have messages that were supposed to be rejected in
2487 // our local block chain
2488 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2490 Slot *slot = indexer->getSlot(seqNum);
2493 // If we have this slot make sure that it was not supposed to be
2495 int64_t slotMachineId = slot->getMachineID();
2496 if (isequal != (slotMachineId == machineId)) {
2497 throw new Error("Server Error: Trying to insert rejected message for slot ");
2502 // Create a list of clients to watch until they see this rejected
2504 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2505 SetIterator<int64_t, Pair<int64_t, Liveness*> *> * iter = getKeyIterator(lastMessageTable);
2506 while(iter->hasNext()) {
2507 // Machine ID for the last message entry
2508 int64_t lastMessageEntryMachineId = iter->next();
2510 // We've seen it, don't need to continue to watch-> Our next
2511 // message will implicitly acknowledge it->
2512 if (lastMessageEntryMachineId == localMachineId) {
2516 Pair<int64_t, Liveness *> * lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2517 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2519 if (entrySequenceNumber < seq) {
2520 // Add this rejected message to the set of messages that this
2521 // machine ID did not see yet
2522 addWatchVector(lastMessageEntryMachineId, entry);
2523 // This client did not see this rejected message yet so add it
2524 // to the watch set to monitor
2525 deviceWatchSet->add(lastMessageEntryMachineId);
2530 if (deviceWatchSet->isEmpty()) {
2531 // This rejected message has been seen by all the clients so
2534 // We need to watch this rejected message
2535 entry->setWatchSet(deviceWatchSet);
2540 * Check if this abort is live, if not then save it so we can kill it
2541 * later-> update the last transaction number that was arbitrated on->
2543 void Table::processEntry(Abort *entry) {
2544 if (entry->getTransactionSequenceNumber() != -1) {
2545 // update the transaction status if it was sent to the server
2546 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2547 if (status != NULL) {
2548 status->setStatus(TransactionStatus_StatusAborted);
2552 // Abort has not been seen by the client it is for yet so we need to
2555 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2556 if (previouslySeenAbort != NULL) {
2557 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2560 if (entry->getTransactionArbitrator() == localMachineId) {
2561 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2564 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2565 // The machine already saw this so it is dead
2567 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2568 liveAbortTable->remove(&abortid);
2570 if (entry->getTransactionArbitrator() == localMachineId) {
2571 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2576 // Update the last arbitration data that we have seen so far
2577 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2578 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2579 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2581 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2584 // Never seen any data from this arbitrator so record the first one
2585 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2588 // Set dead a transaction if we can
2589 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2591 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2592 if (transactionToSetDead != NULL) {
2593 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2596 // Update the last transaction sequence number that the arbitrator
2598 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2599 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2601 if (entry->getTransactionSequenceNumber() != -1) {
2602 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2608 * Set dead the transaction part if that transaction is dead and keep
2609 * track of all new parts
2611 void Table::processEntry(TransactionPart *entry) {
2612 // Check if we have already seen this transaction and set it dead OR
2613 // if it is not alive
2614 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2615 // This transaction is dead, it was already committed or aborted
2620 // This part is still alive
2621 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2623 if (transactionPart == NULL) {
2624 // Dont have a table for this machine Id yet so make one
2625 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2626 newTransactionParts->put(entry->getMachineId(), transactionPart);
2629 // Update the part and set dead ones we have already seen (got a
2631 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2632 if (previouslySeenPart != NULL) {
2633 previouslySeenPart->setDead();
2638 * Process new commit entries and save them for future use-> Delete duplicates
2640 void Table::processEntry(CommitPart *entry) {
2641 // Update the last transaction that was updated if we can
2642 if (entry->getTransactionSequenceNumber() != -1) {
2643 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2644 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2648 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2649 if (commitPart == NULL) {
2650 // Don't have a table for this machine Id yet so make one
2651 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2652 newCommitParts->put(entry->getMachineId(), commitPart);
2654 // Update the part and set dead ones we have already seen (got a
2656 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2657 if (previouslySeenPart != NULL) {
2658 previouslySeenPart->setDead();
2663 * Update the last message seen table-> Update and set dead the
2664 * appropriate RejectedMessages as clients see them-> Updates the live
2665 * aborts, removes those that are dead and sets them dead-> Check that
2666 * the last message seen is correct and that there is no mismatch of
2667 * our own last message or that other clients have not had a rollback
2668 * on the last message->
2670 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2671 // We have seen this machine ID
2672 machineSet->remove(machineId);
2674 // Get the set of rejected messages that this machine Id is has not seen yet
2675 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2676 // If there is a rejected message that this machine Id has not seen yet
2677 if (watchset != NULL) {
2678 // Go through each rejected message that this machine Id has not
2681 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2682 while(rmit->hasNext()) {
2683 RejectedMessage *rm = rmit->next();
2684 // If this machine Id has seen this rejected message->->->
2685 if (rm->getSequenceNumber() <= seqNum) {
2686 // Remove it from our watchlist
2688 // Decrement machines that need to see this notification
2689 rm->removeWatcher(machineId);
2695 // Set dead the abort
2696 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> * abortit = getKeyIterator(liveAbortTable);
2698 while(abortit->hasNext()) {
2699 Pair<int64_t, int64_t> * key = abortit->next();
2700 Abort *abort = liveAbortTable->get(key);
2701 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2704 if (abort->getTransactionArbitrator() == localMachineId) {
2705 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2710 if (machineId == localMachineId) {
2711 // Our own messages are immediately dead->
2712 char livenessType = liveness->getType();
2713 if (livenessType==TypeLastMessage) {
2714 ((LastMessage *)liveness)->setDead();
2715 } else if (livenessType == TypeSlot) {
2716 ((Slot *)liveness)->setDead();
2718 throw new Error("Unrecognized type");
2721 // Get the old last message for this device
2722 Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2723 if (lastMessageEntry == NULL) {
2724 // If no last message then there is nothing else to process
2728 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2729 Liveness *lastEntry = lastMessageEntry->getSecond();
2730 delete lastMessageEntry;
2732 // If it is not our machine Id since we already set ours to dead
2733 if (machineId != localMachineId) {
2734 char lastEntryType = lastEntry->getType();
2736 if (lastEntryType == TypeLastMessage) {
2737 ((LastMessage *)lastEntry)->setDead();
2738 } else if (lastEntryType == TypeSlot) {
2739 ((Slot *)lastEntry)->setDead();
2741 throw new Error("Unrecognized type");
2744 // Make sure the server is not playing any games
2745 if (machineId == localMachineId) {
2746 if (hadPartialSendToServer) {
2747 // We were not making any updates and we had a machine mismatch
2748 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2749 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2752 // We were not making any updates and we had a machine mismatch
2753 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2754 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2758 if (lastMessageSeqNum > seqNum) {
2759 throw new Error("Server Error: Rollback on remote machine sequence number");
2765 * Add a rejected message entry to the watch set to keep track of
2766 * which clients have seen that rejected message entry and which have
2769 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2770 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2771 if (entries == NULL) {
2772 // There is no set for this machine ID yet so create one
2773 entries = new Hashset<RejectedMessage *>();
2774 rejectedMessageWatchVectorTable->put(machineId, entries);
2776 entries->add(entry);
2780 * Check if the HMAC chain is not violated
2782 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2783 for (int i = 0; i < newSlots->length(); i++) {
2784 Slot *currSlot = newSlots->get(i);
2785 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2786 if (prevSlot != NULL &&
2787 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2788 throw new Error("Server Error: Invalid HMAC Chain");