3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
63 lastPendingSendArbitrationEntriesToDelete(NULL),
65 committedKeyValueTable(NULL),
66 speculatedKeyValueTable(NULL),
67 pendingTransactionSpeculatedKeyValueTable(NULL),
68 liveNewKeyTable(NULL),
69 lastMessageTable(NULL),
70 rejectedMessageWatchVectorTable(NULL),
71 arbitratorTable(NULL),
73 newTransactionParts(NULL),
75 lastArbitratedTransactionNumberByArbitratorTable(NULL),
76 liveTransactionBySequenceNumberTable(NULL),
77 liveTransactionByTransactionIdTable(NULL),
78 liveCommitsTable(NULL),
79 liveCommitsByKeyTable(NULL),
80 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81 rejectedSlotVector(NULL),
82 pendingTransactionQueue(NULL),
83 pendingSendArbitrationRounds(NULL),
84 pendingSendArbitrationEntriesToDelete(NULL),
85 transactionPartsSent(NULL),
86 outstandingTransactionStatus(NULL),
87 liveAbortsGeneratedByLocal(NULL),
88 offlineTransactionsCommittedAndAtServer(NULL),
89 localCommunicationTable(NULL),
90 lastTransactionSeenFromMachineFromServer(NULL),
91 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92 lastInsertedNewKey(false),
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
102 liveTableStatus(NULL),
103 pendingTransactionBuilder(NULL),
104 lastPendingTransactionSpeculatedOn(NULL),
105 firstPendingTransaction(NULL),
107 bufferResizeThreshold(0),
109 oldestLiveSlotSequenceNumver(1),
110 localMachineId(_localMachineId),
112 localSequenceNumber(0),
113 localTransactionSequenceNumber(0),
114 lastTransactionSequenceNumberSpeculatedOn(0),
115 oldestTransactionSequenceNumberSpeculatedOn(0),
116 localArbitrationSequenceNumber(0),
117 hadPartialSendToServer(false),
118 attemptedToSendToServer(false),
120 didFindTableStatus(false),
122 lastSlotAttemptedToSend(NULL),
125 lastTransactionPartsSent(NULL),
126 lastPendingSendArbitrationEntriesToDelete(NULL),
128 committedKeyValueTable(NULL),
129 speculatedKeyValueTable(NULL),
130 pendingTransactionSpeculatedKeyValueTable(NULL),
131 liveNewKeyTable(NULL),
132 lastMessageTable(NULL),
133 rejectedMessageWatchVectorTable(NULL),
134 arbitratorTable(NULL),
135 liveAbortTable(NULL),
136 newTransactionParts(NULL),
137 newCommitParts(NULL),
138 lastArbitratedTransactionNumberByArbitratorTable(NULL),
139 liveTransactionBySequenceNumberTable(NULL),
140 liveTransactionByTransactionIdTable(NULL),
141 liveCommitsTable(NULL),
142 liveCommitsByKeyTable(NULL),
143 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144 rejectedSlotVector(NULL),
145 pendingTransactionQueue(NULL),
146 pendingSendArbitrationRounds(NULL),
147 pendingSendArbitrationEntriesToDelete(NULL),
148 transactionPartsSent(NULL),
149 outstandingTransactionStatus(NULL),
150 liveAbortsGeneratedByLocal(NULL),
151 offlineTransactionsCommittedAndAtServer(NULL),
152 localCommunicationTable(NULL),
153 lastTransactionSeenFromMachineFromServer(NULL),
154 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155 lastInsertedNewKey(false),
166 delete committedKeyValueTable;
167 delete speculatedKeyValueTable;
168 delete pendingTransactionSpeculatedKeyValueTable;
169 delete liveNewKeyTable;
170 delete lastMessageTable;
171 delete rejectedMessageWatchVectorTable;
172 delete arbitratorTable;
173 delete liveAbortTable;
174 delete newTransactionParts;
175 delete newCommitParts;
176 delete lastArbitratedTransactionNumberByArbitratorTable;
177 delete liveTransactionBySequenceNumberTable;
178 delete liveTransactionByTransactionIdTable;
179 delete liveCommitsTable;
180 delete liveCommitsByKeyTable;
181 delete lastCommitSeenSequenceNumberByArbitratorTable;
182 delete rejectedSlotVector;
183 delete pendingTransactionQueue;
184 delete pendingSendArbitrationEntriesToDelete;
185 delete transactionPartsSent;
186 delete outstandingTransactionStatus;
187 delete liveAbortsGeneratedByLocal;
188 delete offlineTransactionsCommittedAndAtServer;
189 delete localCommunicationTable;
190 delete lastTransactionSeenFromMachineFromServer;
191 delete pendingSendArbitrationRounds;
192 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
196 * Init all the stuff needed for for table usage
199 // Init helper objects
200 random = new SecureRandom();
201 buffer = new SlotBuffer();
204 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
205 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
206 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
207 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
208 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
209 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
210 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
211 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
212 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
213 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
214 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
215 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
216 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
217 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
218 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
219 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
220 rejectedSlotVector = new Vector<int64_t>();
221 pendingTransactionQueue = new Vector<Transaction *>();
222 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
223 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
224 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
225 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
226 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
227 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
228 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
229 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
230 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
233 numberOfSlots = buffer->capacity();
234 setResizeThreshold();
238 * Initialize the table by inserting a table status as the first entry
239 * into the table status also initialize the crypto stuff.
241 void Table::initTable() {
242 cloud->initSecurity();
244 // Create the first insertion into the block chain which is the table status
245 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
246 localSequenceNumber++;
247 TableStatus *status = new TableStatus(s, numberOfSlots);
249 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
252 array = new Array<Slot *>(1);
254 // update local block chain
255 validateAndUpdate(array, true);
256 } else if (array->length() == 1) {
257 // in case we did push the slot BUT we failed to init it
258 validateAndUpdate(array, true);
260 throw new Error("Error on initialization");
265 * Rebuild the table from scratch by pulling the latest block chain
268 void Table::rebuild() {
269 // Just pull the latest slots from the server
270 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
271 validateAndUpdate(newslots, true);
273 updateLiveTransactionsAndStatus();
276 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
277 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
280 int64_t Table::getArbitrator(IoTString *key) {
281 return arbitratorTable->get(key);
284 void Table::close() {
288 IoTString *Table::getCommitted(IoTString *key) {
289 KeyValue *kv = committedKeyValueTable->get(key);
292 return kv->getValue();
298 IoTString *Table::getSpeculative(IoTString *key) {
299 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
302 kv = speculatedKeyValueTable->get(key);
306 kv = committedKeyValueTable->get(key);
310 return kv->getValue();
316 IoTString *Table::getCommittedAtomic(IoTString *key) {
317 KeyValue *kv = committedKeyValueTable->get(key);
319 if (!arbitratorTable->contains(key)) {
320 throw new Error("Key not Found.");
323 // Make sure new key value pair matches the current arbitrator
324 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
325 // TODO: Maybe not throw en error
326 throw new Error("Not all Key Values Match Arbitrator.");
330 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
331 return kv->getValue();
333 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
338 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
339 if (!arbitratorTable->contains(key)) {
340 throw new Error("Key not Found.");
343 // Make sure new key value pair matches the current arbitrator
344 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
345 // TODO: Maybe not throw en error
346 throw new Error("Not all Key Values Match Arbitrator.");
349 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
352 kv = speculatedKeyValueTable->get(key);
356 kv = committedKeyValueTable->get(key);
360 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
361 return kv->getValue();
363 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
368 bool Table::update() {
370 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
371 validateAndUpdate(newSlots, false);
373 updateLiveTransactionsAndStatus();
375 } catch (Exception *e) {
376 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
377 while (kit->hasNext()) {
378 int64_t m = kit->next();
387 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
389 if (arbitratorTable->contains(keyName)) {
390 // There is already an arbitrator
393 NewKey *newKey = new NewKey(NULL, keyName, machineId);
395 if (sendToServer(newKey)) {
396 // If successfully inserted
402 void Table::startTransaction() {
403 // Create a new transaction, invalidates any old pending transactions.
404 pendingTransactionBuilder = new PendingTransaction(localMachineId);
407 void Table::addKV(IoTString *key, IoTString *value) {
409 // Make sure it is a valid key
410 if (!arbitratorTable->contains(key)) {
411 throw new Error("Key not Found.");
414 // Make sure new key value pair matches the current arbitrator
415 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
416 // TODO: Maybe not throw en error
417 throw new Error("Not all Key Values Match Arbitrator.");
420 // Add the key value to this transaction
421 KeyValue *kv = new KeyValue(key, value);
422 pendingTransactionBuilder->addKV(kv);
425 TransactionStatus *Table::commitTransaction() {
426 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
427 // transaction with no updates will have no effect on the system
428 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
431 // Set the local transaction sequence number and increment
432 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
433 localTransactionSequenceNumber++;
435 // Create the transaction status
436 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
438 // Create the new transaction
439 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
440 newTransaction->setTransactionStatus(transactionStatus);
442 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
443 // Add it to the queue and invalidate the builder for safety
444 pendingTransactionQueue->add(newTransaction);
446 arbitrateOnLocalTransaction(newTransaction);
447 updateLiveStateFromLocal();
450 pendingTransactionBuilder = new PendingTransaction(localMachineId);
454 } catch (ServerException *e) {
456 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
457 uint size = pendingTransactionQueue->size();
459 for (uint iter = 0; iter < size; iter++) {
460 Transaction *transaction = pendingTransactionQueue->get(iter);
461 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
463 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
464 // Already contacted this client so ignore all attempts to contact this client
465 // to preserve ordering for arbitrator
469 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
471 if (sendReturn.getFirst()) {
472 // Failed to contact over local
473 arbitratorTriedAndFailed->add(transaction->getArbitrator());
475 // Successful contact or should not contact
477 if (sendReturn.getSecond()) {
483 pendingTransactionQueue->setSize(oldindex);
486 updateLiveStateFromLocal();
488 return transactionStatus;
492 * Recalculate the new resize threshold
494 void Table::setResizeThreshold() {
495 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
496 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
499 int64_t Table::getLocalSequenceNumber() {
500 return localSequenceNumber;
503 NewKey * Table::handlePartialSend(NewKey * newKey) {
504 //Didn't receive acknowledgement for last send
505 //See if the server has received a newer slot
507 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
508 if (newSlots->length() == 0) {
509 //Retry sending old slot
510 bool wasInserted = false;
511 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
513 if (sendSlotsReturn) {
514 if (newKey != NULL) {
515 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
520 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
521 while (trit->hasNext()) {
522 Transaction *transaction = trit->next();
523 transaction->resetServerFailure();
524 // Update which transactions parts still need to be sent
525 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
526 // Add the transaction status to the outstanding list
527 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
529 // Update the transaction status
530 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
532 // Check if all the transaction parts were successfully
533 // sent and if so then remove it from pending
534 if (transaction->didSendAllParts()) {
535 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
536 pendingTransactionQueue->remove(transaction);
541 bool isInserted = false;
542 for (uint si = 0; si < newSlots->length(); si++) {
543 Slot *s = newSlots->get(si);
544 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
550 for (uint si = 0; si < newSlots->length(); si++) {
551 Slot *s = newSlots->get(si);
556 // Process each entry in the slot
557 Vector<Entry *> *ventries = s->getEntries();
558 uint vesize = ventries->size();
559 for (uint vei = 0; vei < vesize; vei++) {
560 Entry *entry = ventries->get(vei);
561 if (entry->getType() == TypeLastMessage) {
562 LastMessage *lastMessage = (LastMessage *)entry;
563 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
572 if (newKey != NULL) {
573 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
578 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
579 while (trit->hasNext()) {
580 Transaction *transaction = trit->next();
581 transaction->resetServerFailure();
583 // Update which transactions parts still need to be sent
584 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
586 // Add the transaction status to the outstanding list
587 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
589 // Update the transaction status
590 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
592 // Check if all the transaction parts were successfully sent and if so then remove it from pending
593 if (transaction->didSendAllParts()) {
594 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
595 pendingTransactionQueue->remove(transaction);
597 transaction->resetServerFailure();
598 // Set the transaction sequence number back to nothing
599 if (!transaction->didSendAPartToServer()) {
600 transaction->setSequenceNumber(-1);
608 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
609 while (trit->hasNext()) {
610 Transaction *transaction = trit->next();
611 transaction->resetServerFailure();
612 // Set the transaction sequence number back to nothing
613 if (!transaction->didSendAPartToServer()) {
614 transaction->setSequenceNumber(-1);
619 if (newSlots->length() != 0) {
620 // insert into the local block chain
621 validateAndUpdate(newSlots, true);
625 bool isInserted = false;
626 for (uint si = 0; si < newSlots->length(); si++) {
627 Slot *s = newSlots->get(si);
628 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
634 for (uint si = 0; si < newSlots->length(); si++) {
635 Slot *s = newSlots->get(si);
640 // Process each entry in the slot
641 Vector<Entry *> *entries = s->getEntries();
642 uint eSize = entries->size();
643 for (uint ei = 0; ei < eSize; ei++) {
644 Entry *entry = entries->get(ei);
646 if (entry->getType() == TypeLastMessage) {
647 LastMessage *lastMessage = (LastMessage *)entry;
648 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
657 if (newKey != NULL) {
658 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
663 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664 while (trit->hasNext()) {
665 Transaction *transaction = trit->next();
666 transaction->resetServerFailure();
668 // Update which transactions parts still need to be sent
669 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
671 // Add the transaction status to the outstanding list
672 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
674 // Update the transaction status
675 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
677 // Check if all the transaction parts were successfully sent and if so then remove it from pending
678 if (transaction->didSendAllParts()) {
679 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
680 pendingTransactionQueue->remove(transaction);
682 transaction->resetServerFailure();
683 // Set the transaction sequence number back to nothing
684 if (!transaction->didSendAPartToServer()) {
685 transaction->setSequenceNumber(-1);
691 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
692 while (trit->hasNext()) {
693 Transaction *transaction = trit->next();
694 transaction->resetServerFailure();
695 // Set the transaction sequence number back to nothing
696 if (!transaction->didSendAPartToServer()) {
697 transaction->setSequenceNumber(-1);
703 // insert into the local block chain
704 validateAndUpdate(newSlots, true);
709 bool Table::sendToServer(NewKey *newKey) {
710 if (hadPartialSendToServer) {
711 newKey = handlePartialSend(newKey);
715 // While we have stuff that needs inserting into the block chain
716 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
717 if (hadPartialSendToServer) {
718 throw new Error("Should Be error free");
721 // If there is a new key with same name then end
722 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
727 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
728 localSequenceNumber++;
730 // Try to fill the slot with data
731 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
732 bool needsResize = fillSlotsReturn.getFirst();
733 int newSize = fillSlotsReturn.getSecond();
734 bool insertedNewKey = fillSlotsReturn.getThird();
737 // Reset which transaction to send
738 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
739 while (trit->hasNext()) {
740 Transaction *transaction = trit->next();
741 transaction->resetNextPartToSend();
743 // Set the transaction sequence number back to nothing
744 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
745 transaction->setSequenceNumber(-1);
750 // Clear the sent data since we are trying again
751 pendingSendArbitrationEntriesToDelete->clear();
752 transactionPartsSent->clear();
754 // We needed a resize so try again
755 fillSlot(slot, true, newKey);
758 lastSlotAttemptedToSend = slot;
759 lastIsNewKey = (newKey != NULL);
760 lastInsertedNewKey = insertedNewKey;
761 lastNewSize = newSize;
763 lastTransactionPartsSent = transactionPartsSent->clone();
764 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
766 Array<Slot *> * newSlots = NULL;
767 bool wasInserted = false;
768 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
770 if (sendSlotsReturn) {
771 // Did insert into the block chain
773 if (insertedNewKey) {
774 // This slot was what was inserted not a previous slot
776 // New Key was successfully inserted into the block chain so dont want to insert it again
780 // Remove the aborts and commit parts that were sent from the pending to send queue
781 uint size = pendingSendArbitrationRounds->size();
783 for (uint i = 0; i < size; i++) {
784 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
785 round->removeParts(pendingSendArbitrationEntriesToDelete);
787 if (!round->isDoneSending()) {
788 // Sent all the parts
789 pendingSendArbitrationRounds->set(oldcount++,
790 pendingSendArbitrationRounds->get(i));
793 pendingSendArbitrationRounds->setSize(oldcount);
795 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
796 while (trit->hasNext()) {
797 Transaction *transaction = trit->next();
798 transaction->resetServerFailure();
800 // Update which transactions parts still need to be sent
801 transaction->removeSentParts(transactionPartsSent->get(transaction));
803 // Add the transaction status to the outstanding list
804 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
806 // Update the transaction status
807 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
809 // Check if all the transaction parts were successfully sent and if so then remove it from pending
810 if (transaction->didSendAllParts()) {
811 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
812 pendingTransactionQueue->remove(transaction);
817 // Reset which transaction to send
818 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
819 while (trit->hasNext()) {
820 Transaction *transaction = trit->next();
821 transaction->resetNextPartToSend();
823 // Set the transaction sequence number back to nothing
824 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
825 transaction->setSequenceNumber(-1);
831 // Clear the sent data in preparation for next send
832 pendingSendArbitrationEntriesToDelete->clear();
833 transactionPartsSent->clear();
835 if (newSlots->length() != 0) {
836 // insert into the local block chain
837 validateAndUpdate(newSlots, true);
841 } catch (ServerException *e) {
842 if (e->getType() != ServerException_TypeInputTimeout) {
843 // Nothing was able to be sent to the server so just clear these data structures
844 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
845 while (trit->hasNext()) {
846 Transaction *transaction = trit->next();
847 transaction->resetNextPartToSend();
849 // Set the transaction sequence number back to nothing
850 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
851 transaction->setSequenceNumber(-1);
856 // There was a partial send to the server
857 hadPartialSendToServer = true;
859 // Nothing was able to be sent to the server so just clear these data structures
860 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
861 while (trit->hasNext()) {
862 Transaction *transaction = trit->next();
863 transaction->resetNextPartToSend();
864 transaction->setServerFailure();
869 pendingSendArbitrationEntriesToDelete->clear();
870 transactionPartsSent->clear();
875 return newKey == NULL;
878 bool Table::updateFromLocal(int64_t machineId) {
879 if (!localCommunicationTable->contains(machineId))
882 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
884 // Get the size of the send data
885 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
887 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
888 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
889 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
892 Array<char> *sendData = new Array<char>(sendDataSize);
893 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
896 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
900 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
901 localSequenceNumber++;
903 if (returnData == NULL) {
904 // Could not contact server
909 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
910 int numberOfEntries = bbDecode->getInt();
912 for (int i = 0; i < numberOfEntries; i++) {
913 char type = bbDecode->get();
914 if (type == TypeAbort) {
915 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
917 } else if (type == TypeCommitPart) {
918 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
919 processEntry(commitPart);
923 updateLiveStateFromLocal();
928 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
930 // Get the devices local communications
931 if (!localCommunicationTable->contains(transaction->getArbitrator()))
932 return Pair<bool, bool>(true, false);
934 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
936 // Get the size of the send data
937 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
939 Vector<TransactionPart *> *tParts = transaction->getParts();
940 uint tPartsSize = tParts->size();
941 for (uint i = 0; i < tPartsSize; i++) {
942 TransactionPart *part = tParts->get(i);
943 sendDataSize += part->getSize();
947 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
948 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
949 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
952 // Make the send data size
953 Array<char> *sendData = new Array<char>(sendDataSize);
954 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
957 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
958 bbEncode->putInt(transaction->getParts()->size());
960 Vector<TransactionPart *> *tParts = transaction->getParts();
961 uint tPartsSize = tParts->size();
962 for (uint i = 0; i < tPartsSize; i++) {
963 TransactionPart *part = tParts->get(i);
964 part->encode(bbEncode);
969 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
970 localSequenceNumber++;
972 if (returnData == NULL) {
973 // Could not contact server
974 return Pair<bool, bool>(true, false);
978 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
979 bool didCommit = bbDecode->get() == 1;
980 bool couldArbitrate = bbDecode->get() == 1;
981 int numberOfEntries = bbDecode->getInt();
982 bool foundAbort = false;
984 for (int i = 0; i < numberOfEntries; i++) {
985 char type = bbDecode->get();
986 if (type == TypeAbort) {
987 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
989 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
994 } else if (type == TypeCommitPart) {
995 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
996 processEntry(commitPart);
1000 updateLiveStateFromLocal();
1002 if (couldArbitrate) {
1003 TransactionStatus *status = transaction->getTransactionStatus();
1005 status->setStatus(TransactionStatus_StatusCommitted);
1007 status->setStatus(TransactionStatus_StatusAborted);
1010 TransactionStatus *status = transaction->getTransactionStatus();
1012 status->setStatus(TransactionStatus_StatusAborted);
1014 status->setStatus(TransactionStatus_StatusCommitted);
1018 return Pair<bool, bool>(false, true);
1021 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1023 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1024 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1025 int numberOfParts = bbDecode->getInt();
1027 // If we did commit a transaction or not
1028 bool didCommit = false;
1029 bool couldArbitrate = false;
1031 if (numberOfParts != 0) {
1033 // decode the transaction
1034 Transaction *transaction = new Transaction();
1035 for (int i = 0; i < numberOfParts; i++) {
1037 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1038 transaction->addPartDecode(newPart);
1041 // Arbitrate on transaction and pull relevant return data
1042 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1043 couldArbitrate = localArbitrateReturn.getFirst();
1044 didCommit = localArbitrateReturn.getSecond();
1046 updateLiveStateFromLocal();
1048 // Transaction was sent to the server so keep track of it to prevent double commit
1049 if (transaction->getSequenceNumber() != -1) {
1050 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1054 // The data to send back
1055 int returnDataSize = 0;
1056 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1058 // Get the aborts to send back
1059 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1061 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1062 while (abortit->hasNext())
1063 abortLocalSequenceNumbers->add(abortit->next());
1067 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1069 uint asize = abortLocalSequenceNumbers->size();
1070 for (uint i = 0; i < asize; i++) {
1071 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1072 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1076 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1077 unseenArbitrations->add(abort);
1078 returnDataSize += abort->getSize();
1081 // Get the commits to send back
1082 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1083 if (commitForClientTable != NULL) {
1084 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1086 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1087 while (commitit->hasNext())
1088 commitLocalSequenceNumbers->add(commitit->next());
1091 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1093 uint clsSize = commitLocalSequenceNumbers->size();
1094 for (uint clsi = 0; clsi < clsSize; clsi++) {
1095 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1096 Commit *commit = commitForClientTable->get(localSequenceNumber);
1098 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1103 Vector<CommitPart *> *parts = commit->getParts();
1104 uint nParts = parts->size();
1105 for (uint i = 0; i < nParts; i++) {
1106 CommitPart *commitPart = parts->get(i);
1107 unseenArbitrations->add(commitPart);
1108 returnDataSize += commitPart->getSize();
1114 // Number of arbitration entries to decode
1115 returnDataSize += 2 * sizeof(int32_t);
1117 // bool of did commit or not
1118 if (numberOfParts != 0) {
1119 returnDataSize += sizeof(char);
1122 // Data to send Back
1123 Array<char> *returnData = new Array<char>(returnDataSize);
1124 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1126 if (numberOfParts != 0) {
1128 bbEncode->put((char)1);
1130 bbEncode->put((char)0);
1132 if (couldArbitrate) {
1133 bbEncode->put((char)1);
1135 bbEncode->put((char)0);
1139 bbEncode->putInt(unseenArbitrations->size());
1140 uint size = unseenArbitrations->size();
1141 for (uint i = 0; i < size; i++) {
1142 Entry *entry = unseenArbitrations->get(i);
1143 entry->encode(bbEncode);
1146 localSequenceNumber++;
1151 /** Method tries to send slot to server. Returns status in tuple.
1152 isInserted returns whether last un-acked send (if any) was
1153 successful. Returns whether send was confirmed.x
1156 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1157 attemptedToSendToServer = true;
1159 *array = cloud->putSlot(slot, newSize);
1160 if (*array == NULL) {
1161 *array = new Array<Slot *>(1);
1162 (*array)->set(0, slot);
1163 rejectedSlotVector->clear();
1164 *isInserted = false;
1167 if ((*array)->length() == 0) {
1168 throw new Error("Server Error: Did not send any slots");
1171 if (hadPartialSendToServer) {
1172 *isInserted = false;
1173 uint size = (*array)->length();
1174 for (uint i = 0; i < size; i++) {
1175 Slot *s = (*array)->get(i);
1176 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1182 //Also need to see if other machines acknowledged our message
1183 if (!(*isInserted)) {
1184 for (uint i = 0; i < size; i++) {
1185 Slot *s = (*array)->get(i);
1187 // Process each entry in the slot
1188 Vector<Entry *> *entries = s->getEntries();
1189 uint eSize = entries->size();
1190 for (uint ei = 0; ei < eSize; ei++) {
1191 Entry *entry = entries->get(ei);
1193 if (entry->getType() == TypeLastMessage) {
1194 LastMessage *lastMessage = (LastMessage *)entry;
1196 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1205 if (!(*isInserted)) {
1206 rejectedSlotVector->add(slot->getSequenceNumber());
1211 rejectedSlotVector->add(slot->getSequenceNumber());
1212 *isInserted = false;
1219 * Returns false if a resize was needed
1221 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1223 if (liveSlotCount > bufferResizeThreshold) {
1224 resize = true;//Resize is forced
1228 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1229 TableStatus *status = new TableStatus(slot, newSize);
1230 slot->addEntry(status);
1233 // Fill with rejected slots first before doing anything else
1234 doRejectedMessages(slot);
1236 // Do mandatory rescue of entries
1237 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1239 // Extract working variables
1240 bool needsResize = mandatoryRescueReturn.getFirst();
1241 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1242 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1244 if (needsResize && !resize) {
1245 // We need to resize but we are not resizing so return false
1246 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1249 bool inserted = false;
1250 if (newKeyEntry != NULL) {
1251 newKeyEntry->setSlot(slot);
1252 if (slot->hasSpace(newKeyEntry)) {
1253 slot->addEntry(newKeyEntry);
1258 // Clear the transactions, aborts and commits that were sent previously
1259 transactionPartsSent->clear();
1260 pendingSendArbitrationEntriesToDelete->clear();
1261 uint size = pendingSendArbitrationRounds->size();
1262 for (uint i = 0; i < size; i++) {
1263 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1264 bool isFull = false;
1265 round->generateParts();
1266 Vector<Entry *> *parts = round->getParts();
1268 // Insert pending arbitration data
1269 uint vsize = parts->size();
1270 for (uint vi = 0; vi < vsize; vi++) {
1271 Entry *arbitrationData = parts->get(vi);
1273 // If it is an abort then we need to set some information
1274 if (arbitrationData->getType() == TypeAbort) {
1275 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1278 if (!slot->hasSpace(arbitrationData)) {
1279 // No space so cant do anything else with these data entries
1284 // Add to this current slot and add it to entries to delete
1285 slot->addEntry(arbitrationData);
1286 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1294 if (pendingTransactionQueue->size() > 0) {
1295 Transaction *transaction = pendingTransactionQueue->get(0);
1296 // Set the transaction sequence number if it has yet to be inserted into the block chain
1297 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1298 transaction->setSequenceNumber(slot->getSequenceNumber());
1302 TransactionPart *part = transaction->getNextPartToSend();
1304 // Ran out of parts to send for this transaction so move on
1308 if (slot->hasSpace(part)) {
1309 slot->addEntry(part);
1310 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1311 if (partsSent == NULL) {
1312 partsSent = new Vector<int32_t>();
1313 transactionPartsSent->put(transaction, partsSent);
1315 partsSent->add(part->getPartNumber());
1316 transactionPartsSent->put(transaction, partsSent);
1323 // Fill the remainder of the slot with rescue data
1324 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1326 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1329 void Table::doRejectedMessages(Slot *s) {
1330 if (!rejectedSlotVector->isEmpty()) {
1331 /* TODO: We should avoid generating a rejected message entry if
1332 * there is already a sufficient entry in the queue (e->g->,
1333 * equalsto value of true and same sequence number)-> */
1335 int64_t old_seqn = rejectedSlotVector->get(0);
1336 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1337 int64_t new_seqn = rejectedSlotVector->lastElement();
1338 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1341 int64_t prev_seqn = -1;
1343 /* Go through list of missing messages */
1344 for (; i < rejectedSlotVector->size(); i++) {
1345 int64_t curr_seqn = rejectedSlotVector->get(i);
1346 Slot *s_msg = buffer->getSlot(curr_seqn);
1349 prev_seqn = curr_seqn;
1351 /* Generate rejected message entry for missing messages */
1352 if (prev_seqn != -1) {
1353 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1356 /* Generate rejected message entries for present messages */
1357 for (; i < rejectedSlotVector->size(); i++) {
1358 int64_t curr_seqn = rejectedSlotVector->get(i);
1359 Slot *s_msg = buffer->getSlot(curr_seqn);
1360 int64_t machineid = s_msg->getMachineID();
1361 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1368 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1369 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1370 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1371 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1372 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1375 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1376 bool seenLiveSlot = false;
1377 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1378 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1382 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1383 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1384 // Push slot number forward
1385 if (!seenLiveSlot) {
1386 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1389 if (!previousSlot->isLive()) {
1393 // We have seen a live slot
1394 seenLiveSlot = true;
1396 // Get all the live entries for a slot
1397 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1399 // Iterate over all the live entries and try to rescue them
1400 uint lESize = liveEntries->size();
1401 for (uint i = 0; i < lESize; i++) {
1402 Entry *liveEntry = liveEntries->get(i);
1403 if (slot->hasSpace(liveEntry)) {
1404 // Enough space to rescue the entry
1405 slot->addEntry(liveEntry);
1406 } else if (currentSequenceNumber == firstIfFull) {
1407 //if there's no space but the entry is about to fall off the queue
1408 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1414 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1417 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1418 /* now go through live entries from least to greatest sequence number until
1419 * either all live slots added, or the slot doesn't have enough room
1420 * for SKIP_THRESHOLD consecutive entries*/
1422 int64_t newestseqnum = buffer->getNewestSeqNum();
1423 for (; seqn <= newestseqnum; seqn++) {
1424 Slot *prevslot = buffer->getSlot(seqn);
1425 //Push slot number forward
1427 oldestLiveSlotSequenceNumver = seqn;
1429 if (!prevslot->isLive())
1431 seenliveslot = true;
1432 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1433 uint lESize = liveentries->size();
1434 for (uint i = 0; i < lESize; i++) {
1435 Entry *liveentry = liveentries->get(i);
1436 if (s->hasSpace(liveentry))
1437 s->addEntry(liveentry);
1440 if (skipcount > Table_SKIP_THRESHOLD)
1450 * Checks for malicious activity and updates the local copy of the block chain->
1452 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1453 // The cloud communication layer has checked slot HMACs already
1455 if (newSlots->length() == 0) {
1459 // Make sure all slots are newer than the last largest slot this
1461 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1462 if (firstSeqNum <= sequenceNumber) {
1463 throw new Error("Server Error: Sent older slots!");
1466 // Create an object that can access both new slots and slots in our
1467 // local chain without committing slots to our local chain
1468 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1470 // Check that the HMAC chain is not broken
1471 checkHMACChain(indexer, newSlots);
1473 // Set to keep track of messages from clients
1474 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1476 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1477 while (lmit->hasNext())
1478 machineSet->add(lmit->next());
1482 // Process each slots data
1484 uint numSlots = newSlots->length();
1485 for (uint i = 0; i < numSlots; i++) {
1486 Slot *slot = newSlots->get(i);
1487 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1488 updateExpectedSize();
1492 // If there is a gap, check to see if the server sent us
1494 if (firstSeqNum != (sequenceNumber + 1)) {
1496 // Check the size of the slots that were sent down by the server->
1497 // Can only check the size if there was a gap
1498 checkNumSlots(newSlots->length());
1500 // Since there was a gap every machine must have pushed a slot or
1501 // must have a last message message-> If not then the server is
1503 if (!machineSet->isEmpty()) {
1504 throw new Error("Missing record for machines: ");
1508 // Update the size of our local block chain->
1511 // Commit new to slots to the local block chain->
1513 uint numSlots = newSlots->length();
1514 for (uint i = 0; i < numSlots; i++) {
1515 Slot *slot = newSlots->get(i);
1517 // Insert this slot into our local block chain copy->
1518 buffer->putSlot(slot);
1520 // Keep track of how many slots are currently live (have live data
1525 // Get the sequence number of the latest slot in the system
1526 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1527 updateLiveStateFromServer();
1529 // No Need to remember after we pulled from the server
1530 offlineTransactionsCommittedAndAtServer->clear();
1532 // This is invalidated now
1533 hadPartialSendToServer = false;
1536 void Table::updateLiveStateFromServer() {
1537 // Process the new transaction parts
1538 processNewTransactionParts();
1540 // Do arbitration on new transactions that were received
1541 arbitrateFromServer();
1543 // Update all the committed keys
1544 bool didCommitOrSpeculate = updateCommittedTable();
1546 // Delete the transactions that are now dead
1547 updateLiveTransactionsAndStatus();
1550 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1551 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1554 void Table::updateLiveStateFromLocal() {
1555 // Update all the committed keys
1556 bool didCommitOrSpeculate = updateCommittedTable();
1558 // Delete the transactions that are now dead
1559 updateLiveTransactionsAndStatus();
1562 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1563 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1566 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1567 int64_t prevslots = firstSequenceNumber;
1569 if (didFindTableStatus) {
1571 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1574 didFindTableStatus = true;
1575 currMaxSize = numberOfSlots;
1578 void Table::updateExpectedSize() {
1581 if (expectedsize > currMaxSize) {
1582 expectedsize = currMaxSize;
1588 * Check the size of the block chain to make sure there are enough
1589 * slots sent back by the server-> This is only called when we have a
1590 * gap between the slots that we have locally and the slots sent by
1591 * the server therefore in the slots sent by the server there will be
1592 * at least 1 Table status message
1594 void Table::checkNumSlots(int numberOfSlots) {
1595 if (numberOfSlots != expectedsize) {
1596 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1601 * Update the size of of the local buffer if it is needed->
1603 void Table::commitNewMaxSize() {
1604 didFindTableStatus = false;
1606 // Resize the local slot buffer
1607 if (numberOfSlots != currMaxSize) {
1608 buffer->resize((int32_t)currMaxSize);
1611 // Change the number of local slots to the new size
1612 numberOfSlots = (int32_t)currMaxSize;
1614 // Recalculate the resize threshold since the size of the local
1615 // buffer has changed
1616 setResizeThreshold();
1620 * Process the new transaction parts from this latest round of slots
1621 * received from the server
1623 void Table::processNewTransactionParts() {
1625 if (newTransactionParts->size() == 0) {
1626 // Nothing new to process
1630 // Iterate through all the machine Ids that we received new parts
1632 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1633 while (tpit->hasNext()) {
1634 int64_t machineId = tpit->next();
1635 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1637 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1638 // Iterate through all the parts for that machine Id
1639 while (ptit->hasNext()) {
1640 Pair<int64_t, int32_t> *partId = ptit->next();
1641 TransactionPart *part = parts->get(partId);
1643 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1644 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1645 if (lastTransactionNumber >= part->getSequenceNumber()) {
1646 // Set dead the transaction part
1652 // Get the transaction object for that sequence number
1653 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1655 if (transaction == NULL) {
1656 // This is a new transaction that we dont have so make a new one
1657 transaction = new Transaction();
1659 // Insert this new transaction into the live tables
1660 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1661 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1664 // Add that part to the transaction
1665 transaction->addPartDecode(part);
1670 // Clear all the new transaction parts in preparation for the next
1671 // time the server sends slots
1672 newTransactionParts->clear();
1675 void Table::arbitrateFromServer() {
1677 if (liveTransactionBySequenceNumberTable->size() == 0) {
1678 // Nothing to arbitrate on so move on
1682 // Get the transaction sequence numbers and sort from oldest to newest
1683 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1685 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1686 while (trit->hasNext())
1687 transactionSequenceNumbers->add(trit->next());
1690 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1692 // Collection of key value pairs that are
1693 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1695 // The last transaction arbitrated on
1696 int64_t lastTransactionCommitted = -1;
1697 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1698 uint tsnSize = transactionSequenceNumbers->size();
1699 for (uint i = 0; i < tsnSize; i++) {
1700 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1701 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1703 // Check if this machine arbitrates for this transaction if not
1704 // then we cant arbitrate this transaction
1705 if (transaction->getArbitrator() != localMachineId) {
1709 if (transactionSequenceNumber < lastSeqNumArbOn) {
1713 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1714 // We have seen this already locally so dont commit again
1719 if (!transaction->isComplete()) {
1720 // Will arbitrate in incorrect order if we continue so just break
1726 // update the largest transaction seen by arbitrator from server
1727 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1728 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1730 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1731 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1732 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1736 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1737 // Guard evaluated as true
1739 // Update the local changes so we can make the commit
1740 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1741 while (kvit->hasNext()) {
1742 KeyValue *kv = kvit->next();
1743 speculativeTableTmp->put(kv->getKey(), kv);
1747 // Update what the last transaction committed was for use in batch commit
1748 lastTransactionCommitted = transactionSequenceNumber;
1750 // Guard evaluated was false so create abort
1752 Abort *newAbort = new Abort(NULL,
1753 transaction->getClientLocalSequenceNumber(),
1754 transaction->getSequenceNumber(),
1755 transaction->getMachineId(),
1756 transaction->getArbitrator(),
1757 localArbitrationSequenceNumber);
1758 localArbitrationSequenceNumber++;
1759 generatedAborts->add(newAbort);
1761 // Insert the abort so we can process
1762 processEntry(newAbort);
1765 lastSeqNumArbOn = transactionSequenceNumber;
1768 Commit *newCommit = NULL;
1770 // If there is something to commit
1771 if (speculativeTableTmp->size() != 0) {
1772 // Create the commit and increment the commit sequence number
1773 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1774 localArbitrationSequenceNumber++;
1776 // Add all the new keys to the commit
1777 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1778 while (spit->hasNext()) {
1779 IoTString *string = spit->next();
1780 KeyValue *kv = speculativeTableTmp->get(string);
1781 newCommit->addKV(kv);
1785 // create the commit parts
1786 newCommit->createCommitParts();
1788 // Append all the commit parts to the end of the pending queue
1789 // waiting for sending to the server
1790 // Insert the commit so we can process it
1791 Vector<CommitPart *> *parts = newCommit->getParts();
1792 uint partsSize = parts->size();
1793 for (uint i = 0; i < partsSize; i++) {
1794 CommitPart *commitPart = parts->get(i);
1795 processEntry(commitPart);
1799 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1800 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1801 pendingSendArbitrationRounds->add(arbitrationRound);
1803 if (compactArbitrationData()) {
1804 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1805 if (newArbitrationRound->getCommit() != NULL) {
1806 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1807 uint partsSize = parts->size();
1808 for (uint i = 0; i < partsSize; i++) {
1809 CommitPart *commitPart = parts->get(i);
1810 processEntry(commitPart);
1817 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1819 // Check if this machine arbitrates for this transaction if not then
1820 // we cant arbitrate this transaction
1821 if (transaction->getArbitrator() != localMachineId) {
1822 return Pair<bool, bool>(false, false);
1825 if (!transaction->isComplete()) {
1826 // Will arbitrate in incorrect order if we continue so just break
1828 return Pair<bool, bool>(false, false);
1831 if (transaction->getMachineId() != localMachineId) {
1832 // dont do this check for local transactions
1833 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1834 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1835 // We've have already seen this from the server
1836 return Pair<bool, bool>(false, false);
1841 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1842 // Guard evaluated as true Create the commit and increment the
1843 // commit sequence number
1844 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1845 localArbitrationSequenceNumber++;
1847 // Update the local changes so we can make the commit
1848 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1849 while (kvit->hasNext()) {
1850 KeyValue *kv = kvit->next();
1851 newCommit->addKV(kv);
1855 // create the commit parts
1856 newCommit->createCommitParts();
1858 // Append all the commit parts to the end of the pending queue
1859 // waiting for sending to the server
1860 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1861 pendingSendArbitrationRounds->add(arbitrationRound);
1863 if (compactArbitrationData()) {
1864 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1865 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1866 uint partsSize = parts->size();
1867 for (uint i = 0; i < partsSize; i++) {
1868 CommitPart *commitPart = parts->get(i);
1869 processEntry(commitPart);
1872 // Insert the commit so we can process it
1873 Vector<CommitPart *> *parts = newCommit->getParts();
1874 uint partsSize = parts->size();
1875 for (uint i = 0; i < partsSize; i++) {
1876 CommitPart *commitPart = parts->get(i);
1877 processEntry(commitPart);
1881 if (transaction->getMachineId() == localMachineId) {
1882 TransactionStatus *status = transaction->getTransactionStatus();
1883 if (status != NULL) {
1884 status->setStatus(TransactionStatus_StatusCommitted);
1888 updateLiveStateFromLocal();
1889 return Pair<bool, bool>(true, true);
1891 if (transaction->getMachineId() == localMachineId) {
1892 // For locally created messages update the status
1893 // Guard evaluated was false so create abort
1894 TransactionStatus *status = transaction->getTransactionStatus();
1895 if (status != NULL) {
1896 status->setStatus(TransactionStatus_StatusAborted);
1899 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1902 Abort *newAbort = new Abort(NULL,
1903 transaction->getClientLocalSequenceNumber(),
1905 transaction->getMachineId(),
1906 transaction->getArbitrator(),
1907 localArbitrationSequenceNumber);
1908 localArbitrationSequenceNumber++;
1909 addAbortSet->add(newAbort);
1911 // Append all the commit parts to the end of the pending queue
1912 // waiting for sending to the server
1913 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1914 pendingSendArbitrationRounds->add(arbitrationRound);
1916 if (compactArbitrationData()) {
1917 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1919 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1920 uint partsSize = parts->size();
1921 for (uint i = 0; i < partsSize; i++) {
1922 CommitPart *commitPart = parts->get(i);
1923 processEntry(commitPart);
1928 updateLiveStateFromLocal();
1929 return Pair<bool, bool>(true, false);
1934 * Compacts the arbitration data my merging commits and aggregating
1935 * aborts so that a single large push of commits can be done instead
1936 * of many small updates
1938 bool Table::compactArbitrationData() {
1939 if (pendingSendArbitrationRounds->size() < 2) {
1940 // Nothing to compact so do nothing
1944 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1945 if (lastRound->getDidSendPart()) {
1949 bool hadCommit = (lastRound->getCommit() == NULL);
1950 bool gotNewCommit = false;
1952 uint numberToDelete = 1;
1953 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1954 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1956 if (round->isFull() || round->getDidSendPart()) {
1957 // Stop since there is a part that cannot be compacted and we
1958 // need to compact in order
1962 if (round->getCommit() == NULL) {
1963 // Try compacting aborts only
1964 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1965 if (newSize > ArbitrationRound_MAX_PARTS) {
1966 // Cant compact since it would be too large
1969 lastRound->addAborts(round->getAborts());
1971 // Create a new larger commit
1972 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1973 localArbitrationSequenceNumber++;
1975 // Create the commit parts so that we can count them
1976 newCommit->createCommitParts();
1978 // Calculate the new size of the parts
1979 int newSize = newCommit->getNumberOfParts();
1980 newSize += lastRound->getAbortsCount();
1981 newSize += round->getAbortsCount();
1983 if (newSize > ArbitrationRound_MAX_PARTS) {
1984 // Cant compact since it would be too large
1988 // Set the new compacted part
1989 lastRound->setCommit(newCommit);
1990 lastRound->addAborts(round->getAborts());
1991 gotNewCommit = true;
1997 if (numberToDelete != 1) {
1998 // If there is a compaction
1999 // Delete the previous pieces that are now in the new compacted piece
2000 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2001 pendingSendArbitrationRounds->clear();
2003 for (uint i = 0; i < numberToDelete; i++) {
2004 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2008 // Add the new compacted into the pending to send list
2009 pendingSendArbitrationRounds->add(lastRound);
2011 // Should reinsert into the commit processor
2012 if (hadCommit && gotNewCommit) {
2021 * Update all the commits and the committed tables, sets dead the dead
2024 bool Table::updateCommittedTable() {
2026 if (newCommitParts->size() == 0) {
2027 // Nothing new to process
2031 // Iterate through all the machine Ids that we received new parts for
2032 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2033 while (partsit->hasNext()) {
2034 int64_t machineId = partsit->next();
2035 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2037 // Iterate through all the parts for that machine Id
2038 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2039 while (pairit->hasNext()) {
2040 Pair<int64_t, int32_t> *partId = pairit->next();
2041 CommitPart *part = parts->get(partId);
2043 // Get the transaction object for that sequence number
2044 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2046 if (commitForClientTable == NULL) {
2047 // This is the first commit from this device
2048 commitForClientTable = new Hashtable<int64_t, Commit *>();
2049 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2052 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2054 if (commit == NULL) {
2055 // This is a new commit that we dont have so make a new one
2056 commit = new Commit();
2058 // Insert this new commit into the live tables
2059 commitForClientTable->put(part->getSequenceNumber(), commit);
2062 // Add that part to the commit
2063 commit->addPartDecode(part);
2069 // Clear all the new commits parts in preparation for the next time
2070 // the server sends slots
2071 newCommitParts->clear();
2073 // If we process a new commit keep track of it for future use
2074 bool didProcessANewCommit = false;
2076 // Process the commits one by one
2077 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2078 while (liveit->hasNext()) {
2079 int64_t arbitratorId = liveit->next();
2081 // Get all the commits for a specific arbitrator
2082 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2084 // Sort the commits in order
2085 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2087 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2088 while (clientit->hasNext())
2089 commitSequenceNumbers->add(clientit->next());
2093 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2095 // Get the last commit seen from this arbitrator
2096 int64_t lastCommitSeenSequenceNumber = -1;
2097 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2098 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2101 // Go through each new commit one by one
2102 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2103 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2104 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2106 // Special processing if a commit is not complete
2107 if (!commit->isComplete()) {
2108 if (i == (commitSequenceNumbers->size() - 1)) {
2109 // If there is an incomplete commit and this commit is the
2110 // latest one seen then this commit cannot be processed and
2111 // there are no other commits
2114 // This is a commit that was already dead but parts of it
2115 // are still in the block chain (not flushed out yet)->
2116 // Delete it and move on
2118 commitForClientTable->remove(commit->getSequenceNumber());
2123 // Update the last transaction that was updated if we can
2124 if (commit->getTransactionSequenceNumber() != -1) {
2125 // Update the last transaction sequence number that the arbitrator arbitrated on1
2126 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2127 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2131 // Update the last arbitration data that we have seen so far
2132 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2133 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2134 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2136 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2139 // Never seen any data from this arbitrator so record the first one
2140 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2143 // We have already seen this commit before so need to do the
2144 // full processing on this commit
2145 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2147 // Update the last transaction that was updated if we can
2148 if (commit->getTransactionSequenceNumber() != -1) {
2149 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2150 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2151 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2152 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2159 // If we got here then this is a brand new commit and needs full
2161 // Get what commits should be edited, these are the commits that
2162 // have live values for their keys
2163 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2165 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2166 while (kvit->hasNext()) {
2167 KeyValue *kv = kvit->next();
2168 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2170 commitsToEdit->add(commit);
2175 // Update each previous commit that needs to be updated
2176 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2177 while (commitit->hasNext()) {
2178 Commit *previousCommit = commitit->next();
2180 // Only bother with live commits (TODO: Maybe remove this check)
2181 if (previousCommit->isLive()) {
2183 // Update which keys in the old commits are still live
2185 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2186 while (kvit->hasNext()) {
2187 KeyValue *kv = kvit->next();
2188 previousCommit->invalidateKey(kv->getKey());
2193 // if the commit is now dead then remove it
2194 if (!previousCommit->isLive()) {
2195 commitForClientTable->remove(previousCommit->getSequenceNumber());
2201 // Update the last seen sequence number from this arbitrator
2202 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2203 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2204 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2207 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2210 // We processed a new commit that we havent seen before
2211 didProcessANewCommit = true;
2213 // Update the committed table of keys and which commit is using which key
2215 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2216 while (kvit->hasNext()) {
2217 KeyValue *kv = kvit->next();
2218 committedKeyValueTable->put(kv->getKey(), kv);
2219 liveCommitsByKeyTable->put(kv->getKey(), commit);
2227 return didProcessANewCommit;
2231 * Create the speculative table from transactions that are still live
2232 * and have come from the cloud
2234 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2235 if (liveTransactionBySequenceNumberTable->size() == 0) {
2236 // There is nothing to speculate on
2240 // Create a list of the transaction sequence numbers and sort them
2241 // from oldest to newest
2242 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2244 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2245 while (trit->hasNext())
2246 transactionSequenceNumbersSorted->add(trit->next());
2250 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2252 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2255 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2256 // If there is a gap in the transaction sequence numbers then
2257 // there was a commit or an abort of a transaction OR there was a
2258 // new commit (Could be from offline commit) so a redo the
2259 // speculation from scratch
2261 // Start from scratch
2262 speculatedKeyValueTable->clear();
2263 lastTransactionSequenceNumberSpeculatedOn = -1;
2264 oldestTransactionSequenceNumberSpeculatedOn = -1;
2267 // Remember the front of the transaction list
2268 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2270 // Find where to start arbitration from
2271 uint startIndex = 0;
2273 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2274 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2278 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2279 // Make sure we are not out of bounds
2280 return false; // did not speculate
2283 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2284 bool didSkip = true;
2286 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2287 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2288 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2290 if (!transaction->isComplete()) {
2291 // If there is an incomplete transaction then there is nothing
2292 // we can do add this transactions arbitrator to the list of
2293 // arbitrators we should ignore
2294 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2299 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2303 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2305 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2306 // Guard evaluated to true so update the speculative table
2308 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2309 while (kvit->hasNext()) {
2310 KeyValue *kv = kvit->next();
2311 speculatedKeyValueTable->put(kv->getKey(), kv);
2319 // Since there was a skip we need to redo the speculation next time around
2320 lastTransactionSequenceNumberSpeculatedOn = -1;
2321 oldestTransactionSequenceNumberSpeculatedOn = -1;
2324 // We did some speculation
2329 * Create the pending transaction speculative table from transactions
2330 * that are still in the pending transaction buffer
2332 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2333 if (pendingTransactionQueue->size() == 0) {
2334 // There is nothing to speculate on
2338 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2339 // need to reset on the pending speculation
2340 lastPendingTransactionSpeculatedOn = NULL;
2341 firstPendingTransaction = pendingTransactionQueue->get(0);
2342 pendingTransactionSpeculatedKeyValueTable->clear();
2345 // Find where to start arbitration from
2346 uint startIndex = 0;
2348 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2349 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2352 if (startIndex >= pendingTransactionQueue->size()) {
2353 // Make sure we are not out of bounds
2357 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2358 Transaction *transaction = pendingTransactionQueue->get(i);
2360 lastPendingTransactionSpeculatedOn = transaction;
2362 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2363 // Guard evaluated to true so update the speculative table
2364 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2365 while (kvit->hasNext()) {
2366 KeyValue *kv = kvit->next();
2367 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2375 * Set dead and remove from the live transaction tables the
2376 * transactions that are dead
2378 void Table::updateLiveTransactionsAndStatus() {
2379 // Go through each of the transactions
2381 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2382 while (iter->hasNext()) {
2383 int64_t key = iter->next();
2384 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2386 // Check if the transaction is dead
2387 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
2388 // Set dead the transaction
2389 transaction->setDead();
2391 // Remove the transaction from the live table
2393 liveTransactionByTransactionIdTable->remove(transaction->getId());
2399 // Go through each of the transactions
2401 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2402 while (iter->hasNext()) {
2403 int64_t key = iter->next();
2404 TransactionStatus *status = outstandingTransactionStatus->get(key);
2406 // Check if the transaction is dead
2407 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2410 status->setStatus(TransactionStatus_StatusCommitted);
2421 * Process this slot, entry by entry-> Also update the latest message sent by slot
2423 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2425 // Update the last message seen
2426 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2428 // Process each entry in the slot
2429 Vector<Entry *> *entries = slot->getEntries();
2430 uint eSize = entries->size();
2431 for (uint ei = 0; ei < eSize; ei++) {
2432 Entry *entry = entries->get(ei);
2433 switch (entry->getType()) {
2434 case TypeCommitPart:
2435 processEntry((CommitPart *)entry);
2438 processEntry((Abort *)entry);
2440 case TypeTransactionPart:
2441 processEntry((TransactionPart *)entry);
2444 processEntry((NewKey *)entry);
2446 case TypeLastMessage:
2447 processEntry((LastMessage *)entry, machineSet);
2449 case TypeRejectedMessage:
2450 processEntry((RejectedMessage *)entry, indexer);
2452 case TypeTableStatus:
2453 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2456 throw new Error("Unrecognized type: ");
2462 * Update the last message that was sent for a machine Id
2464 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2465 // Update what the last message received by a machine was
2466 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2470 * Add the new key to the arbitrators table and update the set of live
2471 * new keys (in case of a rescued new key message)
2473 void Table::processEntry(NewKey *entry) {
2474 // Update the arbitrator table with the new key information
2475 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2477 // Update what the latest live new key is
2478 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2479 if (oldNewKey != NULL) {
2480 // Delete the old new key messages
2481 oldNewKey->setDead();
2486 * Process new table status entries and set dead the old ones as new
2487 * ones come in-> keeps track of the largest and smallest table status
2488 * seen in this current round of updating the local copy of the block
2491 void Table::processEntry(TableStatus *entry, int64_t seq) {
2492 int newNumSlots = entry->getMaxSlots();
2493 updateCurrMaxSize(newNumSlots);
2494 initExpectedSize(seq, newNumSlots);
2496 if (liveTableStatus != NULL) {
2497 // We have a larger table status so the old table status is no
2499 liveTableStatus->setDead();
2502 // Make this new table status the latest alive table status
2503 liveTableStatus = entry;
2507 * Check old messages to see if there is a block chain violation->
2510 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2511 int64_t oldSeqNum = entry->getOldSeqNum();
2512 int64_t newSeqNum = entry->getNewSeqNum();
2513 bool isequal = entry->getEqual();
2514 int64_t machineId = entry->getMachineID();
2515 int64_t seq = entry->getSequenceNumber();
2517 // Check if we have messages that were supposed to be rejected in
2518 // our local block chain
2519 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2521 Slot *slot = indexer->getSlot(seqNum);
2524 // If we have this slot make sure that it was not supposed to be
2526 int64_t slotMachineId = slot->getMachineID();
2527 if (isequal != (slotMachineId == machineId)) {
2528 throw new Error("Server Error: Trying to insert rejected message for slot ");
2533 // Create a list of clients to watch until they see this rejected
2535 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2536 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2537 while (iter->hasNext()) {
2538 // Machine ID for the last message entry
2539 int64_t lastMessageEntryMachineId = iter->next();
2541 // We've seen it, don't need to continue to watch-> Our next
2542 // message will implicitly acknowledge it->
2543 if (lastMessageEntryMachineId == localMachineId) {
2547 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2548 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2550 if (entrySequenceNumber < seq) {
2551 // Add this rejected message to the set of messages that this
2552 // machine ID did not see yet
2553 addWatchVector(lastMessageEntryMachineId, entry);
2554 // This client did not see this rejected message yet so add it
2555 // to the watch set to monitor
2556 deviceWatchSet->add(lastMessageEntryMachineId);
2561 if (deviceWatchSet->isEmpty()) {
2562 // This rejected message has been seen by all the clients so
2565 // We need to watch this rejected message
2566 entry->setWatchSet(deviceWatchSet);
2571 * Check if this abort is live, if not then save it so we can kill it
2572 * later-> update the last transaction number that was arbitrated on->
2574 void Table::processEntry(Abort *entry) {
2575 if (entry->getTransactionSequenceNumber() != -1) {
2576 // update the transaction status if it was sent to the server
2577 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2578 if (status != NULL) {
2579 status->setStatus(TransactionStatus_StatusAborted);
2583 // Abort has not been seen by the client it is for yet so we need to
2586 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2587 if (previouslySeenAbort != NULL) {
2588 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2591 if (entry->getTransactionArbitrator() == localMachineId) {
2592 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2595 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2596 // The machine already saw this so it is dead
2598 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2599 liveAbortTable->remove(&abortid);
2601 if (entry->getTransactionArbitrator() == localMachineId) {
2602 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2607 // Update the last arbitration data that we have seen so far
2608 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2609 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2610 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2612 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2615 // Never seen any data from this arbitrator so record the first one
2616 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2619 // Set dead a transaction if we can
2620 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2622 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2623 if (transactionToSetDead != NULL) {
2624 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2627 // Update the last transaction sequence number that the arbitrator
2629 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2630 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2632 if (entry->getTransactionSequenceNumber() != -1) {
2633 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2639 * Set dead the transaction part if that transaction is dead and keep
2640 * track of all new parts
2642 void Table::processEntry(TransactionPart *entry) {
2643 // Check if we have already seen this transaction and set it dead OR
2644 // if it is not alive
2645 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2646 // This transaction is dead, it was already committed or aborted
2651 // This part is still alive
2652 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2654 if (transactionPart == NULL) {
2655 // Dont have a table for this machine Id yet so make one
2656 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2657 newTransactionParts->put(entry->getMachineId(), transactionPart);
2660 // Update the part and set dead ones we have already seen (got a
2662 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2663 if (previouslySeenPart != NULL) {
2664 previouslySeenPart->setDead();
2669 * Process new commit entries and save them for future use-> Delete duplicates
2671 void Table::processEntry(CommitPart *entry) {
2672 // Update the last transaction that was updated if we can
2673 if (entry->getTransactionSequenceNumber() != -1) {
2674 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2675 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2679 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2680 if (commitPart == NULL) {
2681 // Don't have a table for this machine Id yet so make one
2682 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2683 newCommitParts->put(entry->getMachineId(), commitPart);
2685 // Update the part and set dead ones we have already seen (got a
2687 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2688 if (previouslySeenPart != NULL) {
2689 previouslySeenPart->setDead();
2694 * Update the last message seen table-> Update and set dead the
2695 * appropriate RejectedMessages as clients see them-> Updates the live
2696 * aborts, removes those that are dead and sets them dead-> Check that
2697 * the last message seen is correct and that there is no mismatch of
2698 * our own last message or that other clients have not had a rollback
2699 * on the last message->
2701 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2702 // We have seen this machine ID
2703 machineSet->remove(machineId);
2705 // Get the set of rejected messages that this machine Id is has not seen yet
2706 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2707 // If there is a rejected message that this machine Id has not seen yet
2708 if (watchset != NULL) {
2709 // Go through each rejected message that this machine Id has not
2712 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2713 while (rmit->hasNext()) {
2714 RejectedMessage *rm = rmit->next();
2715 // If this machine Id has seen this rejected message->->->
2716 if (rm->getSequenceNumber() <= seqNum) {
2717 // Remove it from our watchlist
2719 // Decrement machines that need to see this notification
2720 rm->removeWatcher(machineId);
2726 // Set dead the abort
2727 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2729 while (abortit->hasNext()) {
2730 Pair<int64_t, int64_t> *key = abortit->next();
2731 Abort *abort = liveAbortTable->get(key);
2732 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2735 if (abort->getTransactionArbitrator() == localMachineId) {
2736 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2741 if (machineId == localMachineId) {
2742 // Our own messages are immediately dead->
2743 char livenessType = liveness->getType();
2744 if (livenessType == TypeLastMessage) {
2745 ((LastMessage *)liveness)->setDead();
2746 } else if (livenessType == TypeSlot) {
2747 ((Slot *)liveness)->setDead();
2749 throw new Error("Unrecognized type");
2752 // Get the old last message for this device
2753 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2754 if (lastMessageEntry == NULL) {
2755 // If no last message then there is nothing else to process
2759 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2760 Liveness *lastEntry = lastMessageEntry->getSecond();
2761 delete lastMessageEntry;
2763 // If it is not our machine Id since we already set ours to dead
2764 if (machineId != localMachineId) {
2765 char lastEntryType = lastEntry->getType();
2767 if (lastEntryType == TypeLastMessage) {
2768 ((LastMessage *)lastEntry)->setDead();
2769 } else if (lastEntryType == TypeSlot) {
2770 ((Slot *)lastEntry)->setDead();
2772 throw new Error("Unrecognized type");
2775 // Make sure the server is not playing any games
2776 if (machineId == localMachineId) {
2777 if (hadPartialSendToServer) {
2778 // We were not making any updates and we had a machine mismatch
2779 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2780 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2783 // We were not making any updates and we had a machine mismatch
2784 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2785 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2789 if (lastMessageSeqNum > seqNum) {
2790 throw new Error("Server Error: Rollback on remote machine sequence number");
2796 * Add a rejected message entry to the watch set to keep track of
2797 * which clients have seen that rejected message entry and which have
2800 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2801 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2802 if (entries == NULL) {
2803 // There is no set for this machine ID yet so create one
2804 entries = new Hashset<RejectedMessage *>();
2805 rejectedMessageWatchVectorTable->put(machineId, entries);
2807 entries->add(entry);
2811 * Check if the HMAC chain is not violated
2813 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2814 for (uint i = 0; i < newSlots->length(); i++) {
2815 Slot *currSlot = newSlots->get(i);
2816 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2817 if (prevSlot != NULL &&
2818 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2819 throw new Error("Server Error: Invalid HMAC Chain");