3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
63 lastPendingSendArbitrationEntriesToDelete(NULL),
65 committedKeyValueTable(NULL),
66 speculatedKeyValueTable(NULL),
67 pendingTransactionSpeculatedKeyValueTable(NULL),
68 liveNewKeyTable(NULL),
69 lastMessageTable(NULL),
70 rejectedMessageWatchVectorTable(NULL),
71 arbitratorTable(NULL),
73 newTransactionParts(NULL),
75 lastArbitratedTransactionNumberByArbitratorTable(NULL),
76 liveTransactionBySequenceNumberTable(NULL),
77 liveTransactionByTransactionIdTable(NULL),
78 liveCommitsTable(NULL),
79 liveCommitsByKeyTable(NULL),
80 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81 rejectedSlotVector(NULL),
82 pendingTransactionQueue(NULL),
83 pendingSendArbitrationRounds(NULL),
84 pendingSendArbitrationEntriesToDelete(NULL),
85 transactionPartsSent(NULL),
86 outstandingTransactionStatus(NULL),
87 liveAbortsGeneratedByLocal(NULL),
88 offlineTransactionsCommittedAndAtServer(NULL),
89 localCommunicationTable(NULL),
90 lastTransactionSeenFromMachineFromServer(NULL),
91 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92 lastInsertedNewKey(false),
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
102 liveTableStatus(NULL),
103 pendingTransactionBuilder(NULL),
104 lastPendingTransactionSpeculatedOn(NULL),
105 firstPendingTransaction(NULL),
107 bufferResizeThreshold(0),
109 oldestLiveSlotSequenceNumver(1),
110 localMachineId(_localMachineId),
112 localSequenceNumber(0),
113 localTransactionSequenceNumber(0),
114 lastTransactionSequenceNumberSpeculatedOn(0),
115 oldestTransactionSequenceNumberSpeculatedOn(0),
116 localArbitrationSequenceNumber(0),
117 hadPartialSendToServer(false),
118 attemptedToSendToServer(false),
120 didFindTableStatus(false),
122 lastSlotAttemptedToSend(NULL),
125 lastTransactionPartsSent(NULL),
126 lastPendingSendArbitrationEntriesToDelete(NULL),
128 committedKeyValueTable(NULL),
129 speculatedKeyValueTable(NULL),
130 pendingTransactionSpeculatedKeyValueTable(NULL),
131 liveNewKeyTable(NULL),
132 lastMessageTable(NULL),
133 rejectedMessageWatchVectorTable(NULL),
134 arbitratorTable(NULL),
135 liveAbortTable(NULL),
136 newTransactionParts(NULL),
137 newCommitParts(NULL),
138 lastArbitratedTransactionNumberByArbitratorTable(NULL),
139 liveTransactionBySequenceNumberTable(NULL),
140 liveTransactionByTransactionIdTable(NULL),
141 liveCommitsTable(NULL),
142 liveCommitsByKeyTable(NULL),
143 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144 rejectedSlotVector(NULL),
145 pendingTransactionQueue(NULL),
146 pendingSendArbitrationRounds(NULL),
147 pendingSendArbitrationEntriesToDelete(NULL),
148 transactionPartsSent(NULL),
149 outstandingTransactionStatus(NULL),
150 liveAbortsGeneratedByLocal(NULL),
151 offlineTransactionsCommittedAndAtServer(NULL),
152 localCommunicationTable(NULL),
153 lastTransactionSeenFromMachineFromServer(NULL),
154 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155 lastInsertedNewKey(false),
166 delete committedKeyValueTable;
167 delete speculatedKeyValueTable;
168 delete pendingTransactionSpeculatedKeyValueTable;
169 delete liveNewKeyTable;
170 delete lastMessageTable;
172 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
173 while(rmit->hasNext()) {
174 int64_t machineid = rmit->next();
175 Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
176 SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
177 while (mit->hasNext()) {
178 RejectedMessage * rm = mit->next();
185 delete rejectedMessageWatchVectorTable;
187 delete arbitratorTable;
188 delete liveAbortTable;
190 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
191 while (partsit->hasNext()) {
192 int64_t machineId = partsit->next();
193 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
197 delete newTransactionParts;
200 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
201 while (partsit->hasNext()) {
202 int64_t machineId = partsit->next();
203 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
207 delete newCommitParts;
209 delete lastArbitratedTransactionNumberByArbitratorTable;
210 delete liveTransactionBySequenceNumberTable;
211 delete liveTransactionByTransactionIdTable;
213 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
214 while (liveit->hasNext()) {
215 int64_t arbitratorId = liveit->next();
217 // Get all the commits for a specific arbitrator
218 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
220 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
221 while (clientit->hasNext()) {
222 int64_t id = clientit->next();
223 delete commitForClientTable->get(id);
228 delete commitForClientTable;
231 delete liveCommitsTable;
233 delete liveCommitsByKeyTable;
234 delete lastCommitSeenSequenceNumberByArbitratorTable;
235 delete rejectedSlotVector;
236 delete pendingTransactionQueue;
237 delete pendingSendArbitrationEntriesToDelete;
238 delete transactionPartsSent;
239 delete outstandingTransactionStatus;
240 delete liveAbortsGeneratedByLocal;
241 delete offlineTransactionsCommittedAndAtServer;
242 delete localCommunicationTable;
243 delete lastTransactionSeenFromMachineFromServer;
244 delete pendingSendArbitrationRounds;
245 if (lastTransactionPartsSent != NULL)
246 delete lastTransactionPartsSent;
247 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
251 * Init all the stuff needed for for table usage
254 // Init helper objects
255 random = new SecureRandom();
256 buffer = new SlotBuffer();
259 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
260 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
261 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
262 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
263 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
264 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
265 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
266 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
267 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
268 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
269 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
270 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
271 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
272 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
273 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
274 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
275 rejectedSlotVector = new Vector<int64_t>();
276 pendingTransactionQueue = new Vector<Transaction *>();
277 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
278 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
279 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
280 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
281 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
282 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
283 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
284 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
285 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
288 numberOfSlots = buffer->capacity();
289 setResizeThreshold();
293 * Initialize the table by inserting a table status as the first entry
294 * into the table status also initialize the crypto stuff.
296 void Table::initTable() {
297 cloud->initSecurity();
299 // Create the first insertion into the block chain which is the table status
300 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
301 localSequenceNumber++;
302 TableStatus *status = new TableStatus(s, numberOfSlots);
304 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
307 array = new Array<Slot *>(1);
309 // update local block chain
310 validateAndUpdate(array, true);
312 } else if (array->length() == 1) {
313 // in case we did push the slot BUT we failed to init it
314 validateAndUpdate(array, true);
318 throw new Error("Error on initialization");
323 * Rebuild the table from scratch by pulling the latest block chain
326 void Table::rebuild() {
327 // Just pull the latest slots from the server
328 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
329 validateAndUpdate(newslots, true);
332 updateLiveTransactionsAndStatus();
335 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
336 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
339 int64_t Table::getArbitrator(IoTString *key) {
340 return arbitratorTable->get(key);
343 void Table::close() {
347 IoTString *Table::getCommitted(IoTString *key) {
348 KeyValue *kv = committedKeyValueTable->get(key);
351 return new IoTString(kv->getValue());
357 IoTString *Table::getSpeculative(IoTString *key) {
358 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
361 kv = speculatedKeyValueTable->get(key);
365 kv = committedKeyValueTable->get(key);
369 return new IoTString(kv->getValue());
375 IoTString *Table::getCommittedAtomic(IoTString *key) {
376 KeyValue *kv = committedKeyValueTable->get(key);
378 if (!arbitratorTable->contains(key)) {
379 throw new Error("Key not Found.");
382 // Make sure new key value pair matches the current arbitrator
383 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
384 // TODO: Maybe not throw en error
385 throw new Error("Not all Key Values Match Arbitrator.");
389 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
390 return new IoTString(kv->getValue());
392 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
397 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
398 if (!arbitratorTable->contains(key)) {
399 throw new Error("Key not Found.");
402 // Make sure new key value pair matches the current arbitrator
403 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
404 // TODO: Maybe not throw en error
405 throw new Error("Not all Key Values Match Arbitrator.");
408 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
411 kv = speculatedKeyValueTable->get(key);
415 kv = committedKeyValueTable->get(key);
419 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
420 return new IoTString(kv->getValue());
422 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
427 bool Table::update() {
429 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
430 validateAndUpdate(newSlots, false);
433 updateLiveTransactionsAndStatus();
435 } catch (Exception *e) {
436 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
437 while (kit->hasNext()) {
438 int64_t m = kit->next();
447 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
449 if (arbitratorTable->contains(keyName)) {
450 // There is already an arbitrator
453 NewKey *newKey = new NewKey(NULL, keyName, machineId);
455 if (sendToServer(newKey)) {
456 // If successfully inserted
462 void Table::startTransaction() {
463 // Create a new transaction, invalidates any old pending transactions.
464 pendingTransactionBuilder = new PendingTransaction(localMachineId);
467 void Table::put(IoTString *key, IoTString *value) {
468 // Make sure it is a valid key
469 if (!arbitratorTable->contains(key)) {
470 throw new Error("Key not Found.");
473 // Make sure new key value pair matches the current arbitrator
474 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
475 // TODO: Maybe not throw en error
476 throw new Error("Not all Key Values Match Arbitrator.");
479 // Add the key value to this transaction
480 KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
481 pendingTransactionBuilder->addKV(kv);
484 TransactionStatus *Table::commitTransaction() {
485 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
486 // transaction with no updates will have no effect on the system
487 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
490 // Set the local transaction sequence number and increment
491 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
492 localTransactionSequenceNumber++;
494 // Create the transaction status
495 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
497 // Create the new transaction
498 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
499 newTransaction->setTransactionStatus(transactionStatus);
501 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
502 // Add it to the queue and invalidate the builder for safety
503 pendingTransactionQueue->add(newTransaction);
505 arbitrateOnLocalTransaction(newTransaction);
506 updateLiveStateFromLocal();
509 pendingTransactionBuilder = new PendingTransaction(localMachineId);
513 } catch (ServerException *e) {
515 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
516 uint size = pendingTransactionQueue->size();
518 for (uint iter = 0; iter < size; iter++) {
519 Transaction *transaction = pendingTransactionQueue->get(iter);
520 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
522 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
523 // Already contacted this client so ignore all attempts to contact this client
524 // to preserve ordering for arbitrator
528 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
530 if (sendReturn.getFirst()) {
531 // Failed to contact over local
532 arbitratorTriedAndFailed->add(transaction->getArbitrator());
534 // Successful contact or should not contact
536 if (sendReturn.getSecond()) {
542 pendingTransactionQueue->setSize(oldindex);
545 updateLiveStateFromLocal();
547 return transactionStatus;
551 * Recalculate the new resize threshold
553 void Table::setResizeThreshold() {
554 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
555 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
558 int64_t Table::getLocalSequenceNumber() {
559 return localSequenceNumber;
562 NewKey * Table::handlePartialSend(NewKey * newKey) {
563 //Didn't receive acknowledgement for last send
564 //See if the server has received a newer slot
566 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
567 if (newSlots->length() == 0) {
568 //Retry sending old slot
569 bool wasInserted = false;
570 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
572 if (sendSlotsReturn) {
573 if (newKey != NULL) {
574 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
579 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
580 while (trit->hasNext()) {
581 Transaction *transaction = trit->next();
582 transaction->resetServerFailure();
583 // Update which transactions parts still need to be sent
584 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
585 // Add the transaction status to the outstanding list
586 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
588 // Update the transaction status
589 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
591 // Check if all the transaction parts were successfully
592 // sent and if so then remove it from pending
593 if (transaction->didSendAllParts()) {
594 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
595 pendingTransactionQueue->remove(transaction);
600 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
601 if (newKey != NULL) {
602 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
607 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
608 while (trit->hasNext()) {
609 Transaction *transaction = trit->next();
610 transaction->resetServerFailure();
612 // Update which transactions parts still need to be sent
613 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
615 // Add the transaction status to the outstanding list
616 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
618 // Update the transaction status
619 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
621 // Check if all the transaction parts were successfully sent and if so then remove it from pending
622 if (transaction->didSendAllParts()) {
623 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
624 pendingTransactionQueue->remove(transaction);
626 transaction->resetServerFailure();
627 // Set the transaction sequence number back to nothing
628 if (!transaction->didSendAPartToServer()) {
629 transaction->setSequenceNumber(-1);
637 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
638 while (trit->hasNext()) {
639 Transaction *transaction = trit->next();
640 transaction->resetServerFailure();
641 // Set the transaction sequence number back to nothing
642 if (!transaction->didSendAPartToServer()) {
643 transaction->setSequenceNumber(-1);
648 if (newSlots->length() != 0) {
649 // insert into the local block chain
650 validateAndUpdate(newSlots, true);
653 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
654 if (newKey != NULL) {
655 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
660 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
661 while (trit->hasNext()) {
662 Transaction *transaction = trit->next();
663 transaction->resetServerFailure();
665 // Update which transactions parts still need to be sent
666 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
668 // Add the transaction status to the outstanding list
669 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
671 // Update the transaction status
672 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
674 // Check if all the transaction parts were successfully sent and if so then remove it from pending
675 if (transaction->didSendAllParts()) {
676 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
677 pendingTransactionQueue->remove(transaction);
679 transaction->resetServerFailure();
680 // Set the transaction sequence number back to nothing
681 if (!transaction->didSendAPartToServer()) {
682 transaction->setSequenceNumber(-1);
688 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
689 while (trit->hasNext()) {
690 Transaction *transaction = trit->next();
691 transaction->resetServerFailure();
692 // Set the transaction sequence number back to nothing
693 if (!transaction->didSendAPartToServer()) {
694 transaction->setSequenceNumber(-1);
700 // insert into the local block chain
701 validateAndUpdate(newSlots, true);
707 bool Table::sendToServer(NewKey *newKey) {
708 if (hadPartialSendToServer) {
709 newKey = handlePartialSend(newKey);
713 // While we have stuff that needs inserting into the block chain
714 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
715 if (hadPartialSendToServer) {
716 throw new Error("Should Be error free");
719 // If there is a new key with same name then end
720 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
725 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
726 localSequenceNumber++;
728 // Try to fill the slot with data
730 bool insertedNewKey = false;
731 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
734 // Reset which transaction to send
735 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
736 while (trit->hasNext()) {
737 Transaction *transaction = trit->next();
738 transaction->resetNextPartToSend();
740 // Set the transaction sequence number back to nothing
741 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
742 transaction->setSequenceNumber(-1);
747 // Clear the sent data since we are trying again
748 pendingSendArbitrationEntriesToDelete->clear();
749 transactionPartsSent->clear();
751 // We needed a resize so try again
752 fillSlot(slot, true, newKey, newSize, insertedNewKey);
755 lastSlotAttemptedToSend = slot;
756 lastIsNewKey = (newKey != NULL);
757 lastInsertedNewKey = insertedNewKey;
758 lastNewSize = newSize;
760 if (lastTransactionPartsSent != NULL)
761 delete lastTransactionPartsSent;
762 lastTransactionPartsSent = transactionPartsSent->clone();
763 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
765 Array<Slot *> * newSlots = NULL;
766 bool wasInserted = false;
767 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
769 if (sendSlotsReturn) {
770 // Did insert into the block chain
771 if (insertedNewKey) {
772 // This slot was what was inserted not a previous slot
773 // New Key was successfully inserted into the block chain so dont want to insert it again
777 // Remove the aborts and commit parts that were sent from the pending to send queue
778 uint size = pendingSendArbitrationRounds->size();
780 for (uint i = 0; i < size; i++) {
781 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
782 round->removeParts(pendingSendArbitrationEntriesToDelete);
784 if (!round->isDoneSending()) {
786 pendingSendArbitrationRounds->set(oldcount++,
787 pendingSendArbitrationRounds->get(i));
790 pendingSendArbitrationRounds->setSize(oldcount);
792 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
793 while (trit->hasNext()) {
794 Transaction *transaction = trit->next();
795 transaction->resetServerFailure();
797 // Update which transactions parts still need to be sent
798 transaction->removeSentParts(transactionPartsSent->get(transaction));
800 // Add the transaction status to the outstanding list
801 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
803 // Update the transaction status
804 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
806 // Check if all the transaction parts were successfully sent and if so then remove it from pending
807 if (transaction->didSendAllParts()) {
808 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
809 pendingTransactionQueue->remove(transaction);
814 // Reset which transaction to send
815 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
816 while (trit->hasNext()) {
817 Transaction *transaction = trit->next();
818 transaction->resetNextPartToSend();
820 // Set the transaction sequence number back to nothing
821 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
822 transaction->setSequenceNumber(-1);
828 // Clear the sent data in preparation for next send
829 pendingSendArbitrationEntriesToDelete->clear();
830 transactionPartsSent->clear();
832 if (newSlots->length() != 0) {
833 // insert into the local block chain
834 validateAndUpdate(newSlots, true);
838 } catch (ServerException *e) {
839 if (e->getType() != ServerException_TypeInputTimeout) {
840 // Nothing was able to be sent to the server so just clear these data structures
841 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
842 while (trit->hasNext()) {
843 Transaction *transaction = trit->next();
844 transaction->resetNextPartToSend();
846 // Set the transaction sequence number back to nothing
847 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
848 transaction->setSequenceNumber(-1);
853 // There was a partial send to the server
854 hadPartialSendToServer = true;
856 // Nothing was able to be sent to the server so just clear these data structures
857 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
858 while (trit->hasNext()) {
859 Transaction *transaction = trit->next();
860 transaction->resetNextPartToSend();
861 transaction->setServerFailure();
866 pendingSendArbitrationEntriesToDelete->clear();
867 transactionPartsSent->clear();
872 return newKey == NULL;
875 bool Table::updateFromLocal(int64_t machineId) {
876 if (!localCommunicationTable->contains(machineId))
879 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
881 // Get the size of the send data
882 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
884 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
885 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
886 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
889 Array<char> *sendData = new Array<char>(sendDataSize);
890 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
893 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
897 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
898 localSequenceNumber++;
900 if (returnData == NULL) {
901 // Could not contact server
906 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
907 int numberOfEntries = bbDecode->getInt();
909 for (int i = 0; i < numberOfEntries; i++) {
910 char type = bbDecode->get();
911 if (type == TypeAbort) {
912 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
914 } else if (type == TypeCommitPart) {
915 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
916 processEntry(commitPart);
920 updateLiveStateFromLocal();
925 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
927 // Get the devices local communications
928 if (!localCommunicationTable->contains(transaction->getArbitrator()))
929 return Pair<bool, bool>(true, false);
931 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
933 // Get the size of the send data
934 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
936 Vector<TransactionPart *> *tParts = transaction->getParts();
937 uint tPartsSize = tParts->size();
938 for (uint i = 0; i < tPartsSize; i++) {
939 TransactionPart *part = tParts->get(i);
940 sendDataSize += part->getSize();
944 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
945 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
946 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
949 // Make the send data size
950 Array<char> *sendData = new Array<char>(sendDataSize);
951 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
954 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
955 bbEncode->putInt(transaction->getParts()->size());
957 Vector<TransactionPart *> *tParts = transaction->getParts();
958 uint tPartsSize = tParts->size();
959 for (uint i = 0; i < tPartsSize; i++) {
960 TransactionPart *part = tParts->get(i);
961 part->encode(bbEncode);
966 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
967 localSequenceNumber++;
969 if (returnData == NULL) {
970 // Could not contact server
971 return Pair<bool, bool>(true, false);
975 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
976 bool didCommit = bbDecode->get() == 1;
977 bool couldArbitrate = bbDecode->get() == 1;
978 int numberOfEntries = bbDecode->getInt();
979 bool foundAbort = false;
981 for (int i = 0; i < numberOfEntries; i++) {
982 char type = bbDecode->get();
983 if (type == TypeAbort) {
984 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
986 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
991 } else if (type == TypeCommitPart) {
992 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
993 processEntry(commitPart);
997 updateLiveStateFromLocal();
999 if (couldArbitrate) {
1000 TransactionStatus *status = transaction->getTransactionStatus();
1002 status->setStatus(TransactionStatus_StatusCommitted);
1004 status->setStatus(TransactionStatus_StatusAborted);
1007 TransactionStatus *status = transaction->getTransactionStatus();
1009 status->setStatus(TransactionStatus_StatusAborted);
1011 status->setStatus(TransactionStatus_StatusCommitted);
1015 return Pair<bool, bool>(false, true);
1018 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1020 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1021 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1022 int numberOfParts = bbDecode->getInt();
1024 // If we did commit a transaction or not
1025 bool didCommit = false;
1026 bool couldArbitrate = false;
1028 if (numberOfParts != 0) {
1030 // decode the transaction
1031 Transaction *transaction = new Transaction();
1032 for (int i = 0; i < numberOfParts; i++) {
1034 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1035 transaction->addPartDecode(newPart);
1038 // Arbitrate on transaction and pull relevant return data
1039 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1040 couldArbitrate = localArbitrateReturn.getFirst();
1041 didCommit = localArbitrateReturn.getSecond();
1043 updateLiveStateFromLocal();
1045 // Transaction was sent to the server so keep track of it to prevent double commit
1046 if (transaction->getSequenceNumber() != -1) {
1047 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1051 // The data to send back
1052 int returnDataSize = 0;
1053 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1055 // Get the aborts to send back
1056 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1058 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1059 while (abortit->hasNext())
1060 abortLocalSequenceNumbers->add(abortit->next());
1064 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1066 uint asize = abortLocalSequenceNumbers->size();
1067 for (uint i = 0; i < asize; i++) {
1068 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1069 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1073 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1074 unseenArbitrations->add(abort);
1075 returnDataSize += abort->getSize();
1078 // Get the commits to send back
1079 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1080 if (commitForClientTable != NULL) {
1081 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1083 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1084 while (commitit->hasNext())
1085 commitLocalSequenceNumbers->add(commitit->next());
1088 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1090 uint clsSize = commitLocalSequenceNumbers->size();
1091 for (uint clsi = 0; clsi < clsSize; clsi++) {
1092 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1093 Commit *commit = commitForClientTable->get(localSequenceNumber);
1095 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1100 Vector<CommitPart *> *parts = commit->getParts();
1101 uint nParts = parts->size();
1102 for (uint i = 0; i < nParts; i++) {
1103 CommitPart *commitPart = parts->get(i);
1104 unseenArbitrations->add(commitPart);
1105 returnDataSize += commitPart->getSize();
1111 // Number of arbitration entries to decode
1112 returnDataSize += 2 * sizeof(int32_t);
1114 // bool of did commit or not
1115 if (numberOfParts != 0) {
1116 returnDataSize += sizeof(char);
1119 // Data to send Back
1120 Array<char> *returnData = new Array<char>(returnDataSize);
1121 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1123 if (numberOfParts != 0) {
1125 bbEncode->put((char)1);
1127 bbEncode->put((char)0);
1129 if (couldArbitrate) {
1130 bbEncode->put((char)1);
1132 bbEncode->put((char)0);
1136 bbEncode->putInt(unseenArbitrations->size());
1137 uint size = unseenArbitrations->size();
1138 for (uint i = 0; i < size; i++) {
1139 Entry *entry = unseenArbitrations->get(i);
1140 entry->encode(bbEncode);
1143 localSequenceNumber++;
1147 /** Checks whether a given slot was sent using new slots in
1148 array. Returns true if sent and false otherwise. */
1150 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1151 uint size = array->length();
1152 for (uint i = 0; i < size; i++) {
1153 Slot *s = array->get(i);
1154 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1159 //Also need to see if other machines acknowledged our message
1160 for (uint i = 0; i < size; i++) {
1161 Slot *s = array->get(i);
1163 // Process each entry in the slot
1164 Vector<Entry *> *entries = s->getEntries();
1165 uint eSize = entries->size();
1166 for (uint ei = 0; ei < eSize; ei++) {
1167 Entry *entry = entries->get(ei);
1169 if (entry->getType() == TypeLastMessage) {
1170 LastMessage *lastMessage = (LastMessage *)entry;
1172 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1182 /** Method tries to send slot to server. Returns status in tuple.
1183 isInserted returns whether last un-acked send (if any) was
1184 successful. Returns whether send was confirmed.x
1187 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1188 attemptedToSendToServer = true;
1190 *array = cloud->putSlot(slot, newSize);
1191 if (*array == NULL) {
1192 *array = new Array<Slot *>(1);
1193 (*array)->set(0, slot);
1194 rejectedSlotVector->clear();
1195 *isInserted = false;
1198 if ((*array)->length() == 0) {
1199 throw new Error("Server Error: Did not send any slots");
1202 if (hadPartialSendToServer) {
1203 *isInserted = checkSend(*array, slot);
1205 if (!(*isInserted)) {
1206 rejectedSlotVector->add(slot->getSequenceNumber());
1211 rejectedSlotVector->add(slot->getSequenceNumber());
1212 *isInserted = false;
1219 * Returns true if a resize was needed but not done.
1221 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1222 newSize = 0;//special value to indicate no resize
1223 if (liveSlotCount > bufferResizeThreshold) {
1224 resize = true;//Resize is forced
1228 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1229 TableStatus *status = new TableStatus(slot, newSize);
1230 slot->addEntry(status);
1233 // Fill with rejected slots first before doing anything else
1234 doRejectedMessages(slot);
1236 // Do mandatory rescue of entries
1237 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1239 // Extract working variables
1240 bool needsResize = mandatoryRescueReturn.getFirst();
1241 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1242 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1244 if (needsResize && !resize) {
1245 // We need to resize but we are not resizing so return true to force on retry
1249 insertedKey = false;
1250 if (newKeyEntry != NULL) {
1251 newKeyEntry->setSlot(slot);
1252 if (slot->hasSpace(newKeyEntry)) {
1253 slot->addEntry(newKeyEntry);
1258 // Clear the transactions, aborts and commits that were sent previously
1259 transactionPartsSent->clear();
1260 pendingSendArbitrationEntriesToDelete->clear();
1261 uint size = pendingSendArbitrationRounds->size();
1262 for (uint i = 0; i < size; i++) {
1263 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1264 bool isFull = false;
1265 round->generateParts();
1266 Vector<Entry *> *parts = round->getParts();
1268 // Insert pending arbitration data
1269 uint vsize = parts->size();
1270 for (uint vi = 0; vi < vsize; vi++) {
1271 Entry *arbitrationData = parts->get(vi);
1273 // If it is an abort then we need to set some information
1274 if (arbitrationData->getType() == TypeAbort) {
1275 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1278 if (!slot->hasSpace(arbitrationData)) {
1279 // No space so cant do anything else with these data entries
1284 // Add to this current slot and add it to entries to delete
1285 slot->addEntry(arbitrationData);
1286 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1294 if (pendingTransactionQueue->size() > 0) {
1295 Transaction *transaction = pendingTransactionQueue->get(0);
1296 // Set the transaction sequence number if it has yet to be inserted into the block chain
1297 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1298 transaction->setSequenceNumber(slot->getSequenceNumber());
1302 TransactionPart *part = transaction->getNextPartToSend();
1304 // Ran out of parts to send for this transaction so move on
1308 if (slot->hasSpace(part)) {
1309 slot->addEntry(part);
1310 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1311 if (partsSent == NULL) {
1312 partsSent = new Vector<int32_t>();
1313 transactionPartsSent->put(transaction, partsSent);
1315 partsSent->add(part->getPartNumber());
1316 transactionPartsSent->put(transaction, partsSent);
1323 // Fill the remainder of the slot with rescue data
1324 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1329 void Table::doRejectedMessages(Slot *s) {
1330 if (!rejectedSlotVector->isEmpty()) {
1331 /* TODO: We should avoid generating a rejected message entry if
1332 * there is already a sufficient entry in the queue (e->g->,
1333 * equalsto value of true and same sequence number)-> */
1335 int64_t old_seqn = rejectedSlotVector->get(0);
1336 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1337 int64_t new_seqn = rejectedSlotVector->lastElement();
1338 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1341 int64_t prev_seqn = -1;
1343 /* Go through list of missing messages */
1344 for (; i < rejectedSlotVector->size(); i++) {
1345 int64_t curr_seqn = rejectedSlotVector->get(i);
1346 Slot *s_msg = buffer->getSlot(curr_seqn);
1349 prev_seqn = curr_seqn;
1351 /* Generate rejected message entry for missing messages */
1352 if (prev_seqn != -1) {
1353 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1356 /* Generate rejected message entries for present messages */
1357 for (; i < rejectedSlotVector->size(); i++) {
1358 int64_t curr_seqn = rejectedSlotVector->get(i);
1359 Slot *s_msg = buffer->getSlot(curr_seqn);
1360 int64_t machineid = s_msg->getMachineID();
1361 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1368 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1369 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1370 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1371 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1372 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1375 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1376 bool seenLiveSlot = false;
1377 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1378 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1382 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1383 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1384 // Push slot number forward
1385 if (!seenLiveSlot) {
1386 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1389 if (!previousSlot->isLive()) {
1393 // We have seen a live slot
1394 seenLiveSlot = true;
1396 // Get all the live entries for a slot
1397 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1399 // Iterate over all the live entries and try to rescue them
1400 uint lESize = liveEntries->size();
1401 for (uint i = 0; i < lESize; i++) {
1402 Entry *liveEntry = liveEntries->get(i);
1403 if (slot->hasSpace(liveEntry)) {
1404 // Enough space to rescue the entry
1405 slot->addEntry(liveEntry);
1406 } else if (currentSequenceNumber == firstIfFull) {
1407 //if there's no space but the entry is about to fall off the queue
1408 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1414 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1417 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1418 /* now go through live entries from least to greatest sequence number until
1419 * either all live slots added, or the slot doesn't have enough room
1420 * for SKIP_THRESHOLD consecutive entries*/
1422 int64_t newestseqnum = buffer->getNewestSeqNum();
1423 for (; seqn <= newestseqnum; seqn++) {
1424 Slot *prevslot = buffer->getSlot(seqn);
1425 //Push slot number forward
1427 oldestLiveSlotSequenceNumver = seqn;
1429 if (!prevslot->isLive())
1431 seenliveslot = true;
1432 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1433 uint lESize = liveentries->size();
1434 for (uint i = 0; i < lESize; i++) {
1435 Entry *liveentry = liveentries->get(i);
1436 if (s->hasSpace(liveentry))
1437 s->addEntry(liveentry);
1440 if (skipcount > Table_SKIP_THRESHOLD)
1450 * Checks for malicious activity and updates the local copy of the block chain->
1452 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1453 // The cloud communication layer has checked slot HMACs already
1455 if (newSlots->length() == 0) {
1459 // Make sure all slots are newer than the last largest slot this
1461 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1462 if (firstSeqNum <= sequenceNumber) {
1463 throw new Error("Server Error: Sent older slots!");
1466 // Create an object that can access both new slots and slots in our
1467 // local chain without committing slots to our local chain
1468 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1470 // Check that the HMAC chain is not broken
1471 checkHMACChain(indexer, newSlots);
1473 // Set to keep track of messages from clients
1474 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1476 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1477 while (lmit->hasNext())
1478 machineSet->add(lmit->next());
1482 // Process each slots data
1484 uint numSlots = newSlots->length();
1485 for (uint i = 0; i < numSlots; i++) {
1486 Slot *slot = newSlots->get(i);
1487 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1488 updateExpectedSize();
1493 // If there is a gap, check to see if the server sent us
1495 if (firstSeqNum != (sequenceNumber + 1)) {
1497 // Check the size of the slots that were sent down by the server->
1498 // Can only check the size if there was a gap
1499 checkNumSlots(newSlots->length());
1501 // Since there was a gap every machine must have pushed a slot or
1502 // must have a last message message-> If not then the server is
1504 if (!machineSet->isEmpty()) {
1505 throw new Error("Missing record for machines: ");
1509 // Update the size of our local block chain->
1512 // Commit new to slots to the local block chain->
1514 uint numSlots = newSlots->length();
1515 for (uint i = 0; i < numSlots; i++) {
1516 Slot *slot = newSlots->get(i);
1518 // Insert this slot into our local block chain copy->
1519 buffer->putSlot(slot);
1521 // Keep track of how many slots are currently live (have live data
1526 // Get the sequence number of the latest slot in the system
1527 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1528 updateLiveStateFromServer();
1530 // No Need to remember after we pulled from the server
1531 offlineTransactionsCommittedAndAtServer->clear();
1533 // This is invalidated now
1534 hadPartialSendToServer = false;
1537 void Table::updateLiveStateFromServer() {
1538 // Process the new transaction parts
1539 processNewTransactionParts();
1541 // Do arbitration on new transactions that were received
1542 arbitrateFromServer();
1544 // Update all the committed keys
1545 bool didCommitOrSpeculate = updateCommittedTable();
1547 // Delete the transactions that are now dead
1548 updateLiveTransactionsAndStatus();
1551 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1552 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1555 void Table::updateLiveStateFromLocal() {
1556 // Update all the committed keys
1557 bool didCommitOrSpeculate = updateCommittedTable();
1559 // Delete the transactions that are now dead
1560 updateLiveTransactionsAndStatus();
1563 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1564 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1567 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1568 int64_t prevslots = firstSequenceNumber;
1570 if (didFindTableStatus) {
1572 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1575 didFindTableStatus = true;
1576 currMaxSize = numberOfSlots;
1579 void Table::updateExpectedSize() {
1582 if (expectedsize > currMaxSize) {
1583 expectedsize = currMaxSize;
1589 * Check the size of the block chain to make sure there are enough
1590 * slots sent back by the server-> This is only called when we have a
1591 * gap between the slots that we have locally and the slots sent by
1592 * the server therefore in the slots sent by the server there will be
1593 * at least 1 Table status message
1595 void Table::checkNumSlots(int numberOfSlots) {
1596 if (numberOfSlots != expectedsize) {
1597 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1602 * Update the size of of the local buffer if it is needed->
1604 void Table::commitNewMaxSize() {
1605 didFindTableStatus = false;
1607 // Resize the local slot buffer
1608 if (numberOfSlots != currMaxSize) {
1609 buffer->resize((int32_t)currMaxSize);
1612 // Change the number of local slots to the new size
1613 numberOfSlots = (int32_t)currMaxSize;
1615 // Recalculate the resize threshold since the size of the local
1616 // buffer has changed
1617 setResizeThreshold();
1621 * Process the new transaction parts from this latest round of slots
1622 * received from the server
1624 void Table::processNewTransactionParts() {
1626 if (newTransactionParts->size() == 0) {
1627 // Nothing new to process
1631 // Iterate through all the machine Ids that we received new parts
1633 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1634 while (tpit->hasNext()) {
1635 int64_t machineId = tpit->next();
1636 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1638 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1639 // Iterate through all the parts for that machine Id
1640 while (ptit->hasNext()) {
1641 Pair<int64_t, int32_t> *partId = ptit->next();
1642 TransactionPart *part = parts->get(partId);
1644 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1645 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1646 if (lastTransactionNumber >= part->getSequenceNumber()) {
1647 // Set dead the transaction part
1653 // Get the transaction object for that sequence number
1654 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1656 if (transaction == NULL) {
1657 // This is a new transaction that we dont have so make a new one
1658 transaction = new Transaction();
1660 // Insert this new transaction into the live tables
1661 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1662 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1665 // Add that part to the transaction
1666 transaction->addPartDecode(part);
1671 // Clear all the new transaction parts in preparation for the next
1672 // time the server sends slots
1674 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1675 while (partsit->hasNext()) {
1676 int64_t machineId = partsit->next();
1677 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1681 newTransactionParts->clear();
1685 void Table::arbitrateFromServer() {
1686 if (liveTransactionBySequenceNumberTable->size() == 0) {
1687 // Nothing to arbitrate on so move on
1691 // Get the transaction sequence numbers and sort from oldest to newest
1692 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1694 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1695 while (trit->hasNext())
1696 transactionSequenceNumbers->add(trit->next());
1699 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1701 // Collection of key value pairs that are
1702 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1704 // The last transaction arbitrated on
1705 int64_t lastTransactionCommitted = -1;
1706 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1707 uint tsnSize = transactionSequenceNumbers->size();
1708 for (uint i = 0; i < tsnSize; i++) {
1709 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1710 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1712 // Check if this machine arbitrates for this transaction if not
1713 // then we cant arbitrate this transaction
1714 if (transaction->getArbitrator() != localMachineId) {
1718 if (transactionSequenceNumber < lastSeqNumArbOn) {
1722 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1723 // We have seen this already locally so dont commit again
1728 if (!transaction->isComplete()) {
1729 // Will arbitrate in incorrect order if we continue so just break
1735 // update the largest transaction seen by arbitrator from server
1736 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1737 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1739 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1740 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1741 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1745 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1746 // Guard evaluated as true
1748 // Update the local changes so we can make the commit
1749 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1750 while (kvit->hasNext()) {
1751 KeyValue *kv = kvit->next();
1752 speculativeTableTmp->put(kv->getKey(), kv);
1756 // Update what the last transaction committed was for use in batch commit
1757 lastTransactionCommitted = transactionSequenceNumber;
1759 // Guard evaluated was false so create abort
1761 Abort *newAbort = new Abort(NULL,
1762 transaction->getClientLocalSequenceNumber(),
1763 transaction->getSequenceNumber(),
1764 transaction->getMachineId(),
1765 transaction->getArbitrator(),
1766 localArbitrationSequenceNumber);
1767 localArbitrationSequenceNumber++;
1768 generatedAborts->add(newAbort);
1770 // Insert the abort so we can process
1771 processEntry(newAbort);
1774 lastSeqNumArbOn = transactionSequenceNumber;
1777 Commit *newCommit = NULL;
1779 // If there is something to commit
1780 if (speculativeTableTmp->size() != 0) {
1781 // Create the commit and increment the commit sequence number
1782 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1783 localArbitrationSequenceNumber++;
1785 // Add all the new keys to the commit
1786 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1787 while (spit->hasNext()) {
1788 IoTString *string = spit->next();
1789 KeyValue *kv = speculativeTableTmp->get(string);
1790 newCommit->addKV(kv);
1794 // create the commit parts
1795 newCommit->createCommitParts();
1797 // Append all the commit parts to the end of the pending queue
1798 // waiting for sending to the server
1799 // Insert the commit so we can process it
1800 Vector<CommitPart *> *parts = newCommit->getParts();
1801 uint partsSize = parts->size();
1802 for (uint i = 0; i < partsSize; i++) {
1803 CommitPart *commitPart = parts->get(i);
1804 processEntry(commitPart);
1807 delete speculativeTableTmp;
1809 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1810 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1811 pendingSendArbitrationRounds->add(arbitrationRound);
1813 if (compactArbitrationData()) {
1814 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1815 if (newArbitrationRound->getCommit() != NULL) {
1816 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1817 uint partsSize = parts->size();
1818 for (uint i = 0; i < partsSize; i++) {
1819 CommitPart *commitPart = parts->get(i);
1820 processEntry(commitPart);
1827 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1829 // Check if this machine arbitrates for this transaction if not then
1830 // we cant arbitrate this transaction
1831 if (transaction->getArbitrator() != localMachineId) {
1832 return Pair<bool, bool>(false, false);
1835 if (!transaction->isComplete()) {
1836 // Will arbitrate in incorrect order if we continue so just break
1838 return Pair<bool, bool>(false, false);
1841 if (transaction->getMachineId() != localMachineId) {
1842 // dont do this check for local transactions
1843 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1844 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1845 // We've have already seen this from the server
1846 return Pair<bool, bool>(false, false);
1851 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1852 // Guard evaluated as true Create the commit and increment the
1853 // commit sequence number
1854 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1855 localArbitrationSequenceNumber++;
1857 // Update the local changes so we can make the commit
1858 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1859 while (kvit->hasNext()) {
1860 KeyValue *kv = kvit->next();
1861 newCommit->addKV(kv);
1865 // create the commit parts
1866 newCommit->createCommitParts();
1868 // Append all the commit parts to the end of the pending queue
1869 // waiting for sending to the server
1870 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1871 pendingSendArbitrationRounds->add(arbitrationRound);
1873 if (compactArbitrationData()) {
1874 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1875 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1876 uint partsSize = parts->size();
1877 for (uint i = 0; i < partsSize; i++) {
1878 CommitPart *commitPart = parts->get(i);
1879 processEntry(commitPart);
1882 // Insert the commit so we can process it
1883 Vector<CommitPart *> *parts = newCommit->getParts();
1884 uint partsSize = parts->size();
1885 for (uint i = 0; i < partsSize; i++) {
1886 CommitPart *commitPart = parts->get(i);
1887 processEntry(commitPart);
1891 if (transaction->getMachineId() == localMachineId) {
1892 TransactionStatus *status = transaction->getTransactionStatus();
1893 if (status != NULL) {
1894 status->setStatus(TransactionStatus_StatusCommitted);
1898 updateLiveStateFromLocal();
1899 return Pair<bool, bool>(true, true);
1901 if (transaction->getMachineId() == localMachineId) {
1902 // For locally created messages update the status
1903 // Guard evaluated was false so create abort
1904 TransactionStatus *status = transaction->getTransactionStatus();
1905 if (status != NULL) {
1906 status->setStatus(TransactionStatus_StatusAborted);
1909 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1912 Abort *newAbort = new Abort(NULL,
1913 transaction->getClientLocalSequenceNumber(),
1915 transaction->getMachineId(),
1916 transaction->getArbitrator(),
1917 localArbitrationSequenceNumber);
1918 localArbitrationSequenceNumber++;
1919 addAbortSet->add(newAbort);
1921 // Append all the commit parts to the end of the pending queue
1922 // waiting for sending to the server
1923 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1924 pendingSendArbitrationRounds->add(arbitrationRound);
1926 if (compactArbitrationData()) {
1927 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1929 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1930 uint partsSize = parts->size();
1931 for (uint i = 0; i < partsSize; i++) {
1932 CommitPart *commitPart = parts->get(i);
1933 processEntry(commitPart);
1938 updateLiveStateFromLocal();
1939 return Pair<bool, bool>(true, false);
1944 * Compacts the arbitration data my merging commits and aggregating
1945 * aborts so that a single large push of commits can be done instead
1946 * of many small updates
1948 bool Table::compactArbitrationData() {
1949 if (pendingSendArbitrationRounds->size() < 2) {
1950 // Nothing to compact so do nothing
1954 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1955 if (lastRound->getDidSendPart()) {
1959 bool hadCommit = (lastRound->getCommit() == NULL);
1960 bool gotNewCommit = false;
1962 uint numberToDelete = 1;
1963 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1964 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1966 if (round->isFull() || round->getDidSendPart()) {
1967 // Stop since there is a part that cannot be compacted and we
1968 // need to compact in order
1972 if (round->getCommit() == NULL) {
1973 // Try compacting aborts only
1974 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1975 if (newSize > ArbitrationRound_MAX_PARTS) {
1976 // Cant compact since it would be too large
1979 lastRound->addAborts(round->getAborts());
1981 // Create a new larger commit
1982 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1983 localArbitrationSequenceNumber++;
1985 // Create the commit parts so that we can count them
1986 newCommit->createCommitParts();
1988 // Calculate the new size of the parts
1989 int newSize = newCommit->getNumberOfParts();
1990 newSize += lastRound->getAbortsCount();
1991 newSize += round->getAbortsCount();
1993 if (newSize > ArbitrationRound_MAX_PARTS) {
1994 // Cant compact since it would be too large
1998 // Set the new compacted part
1999 lastRound->setCommit(newCommit);
2000 lastRound->addAborts(round->getAborts());
2001 gotNewCommit = true;
2007 if (numberToDelete != 1) {
2008 // If there is a compaction
2009 // Delete the previous pieces that are now in the new compacted piece
2010 if (numberToDelete == pendingSendArbitrationRounds->size()) {
2011 pendingSendArbitrationRounds->clear();
2013 for (uint i = 0; i < numberToDelete; i++) {
2014 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
2018 // Add the new compacted into the pending to send list
2019 pendingSendArbitrationRounds->add(lastRound);
2021 // Should reinsert into the commit processor
2022 if (hadCommit && gotNewCommit) {
2031 * Update all the commits and the committed tables, sets dead the dead
2034 bool Table::updateCommittedTable() {
2035 if (newCommitParts->size() == 0) {
2036 // Nothing new to process
2040 // Iterate through all the machine Ids that we received new parts for
2041 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2042 while (partsit->hasNext()) {
2043 int64_t machineId = partsit->next();
2044 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2046 // Iterate through all the parts for that machine Id
2047 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2048 while (pairit->hasNext()) {
2049 Pair<int64_t, int32_t> *partId = pairit->next();
2050 CommitPart *part = parts->get(partId);
2052 // Get the transaction object for that sequence number
2053 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2055 if (commitForClientTable == NULL) {
2056 // This is the first commit from this device
2057 commitForClientTable = new Hashtable<int64_t, Commit *>();
2058 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2061 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2063 if (commit == NULL) {
2064 // This is a new commit that we dont have so make a new one
2065 commit = new Commit();
2067 // Insert this new commit into the live tables
2068 commitForClientTable->put(part->getSequenceNumber(), commit);
2071 // Add that part to the commit
2072 commit->addPartDecode(part);
2079 // Clear all the new commits parts in preparation for the next time
2080 // the server sends slots
2081 newCommitParts->clear();
2083 // If we process a new commit keep track of it for future use
2084 bool didProcessANewCommit = false;
2086 // Process the commits one by one
2087 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2088 while (liveit->hasNext()) {
2089 int64_t arbitratorId = liveit->next();
2091 // Get all the commits for a specific arbitrator
2092 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2094 // Sort the commits in order
2095 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2097 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2098 while (clientit->hasNext())
2099 commitSequenceNumbers->add(clientit->next());
2103 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2105 // Get the last commit seen from this arbitrator
2106 int64_t lastCommitSeenSequenceNumber = -1;
2107 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2108 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2111 // Go through each new commit one by one
2112 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2113 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2114 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2116 // Special processing if a commit is not complete
2117 if (!commit->isComplete()) {
2118 if (i == (commitSequenceNumbers->size() - 1)) {
2119 // If there is an incomplete commit and this commit is the
2120 // latest one seen then this commit cannot be processed and
2121 // there are no other commits
2124 // This is a commit that was already dead but parts of it
2125 // are still in the block chain (not flushed out yet)->
2126 // Delete it and move on
2128 commitForClientTable->remove(commit->getSequenceNumber());
2134 // Update the last transaction that was updated if we can
2135 if (commit->getTransactionSequenceNumber() != -1) {
2136 // Update the last transaction sequence number that the arbitrator arbitrated on1
2137 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2138 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2142 // Update the last arbitration data that we have seen so far
2143 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2144 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2145 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2147 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2150 // Never seen any data from this arbitrator so record the first one
2151 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2154 // We have already seen this commit before so need to do the
2155 // full processing on this commit
2156 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2158 // Update the last transaction that was updated if we can
2159 if (commit->getTransactionSequenceNumber() != -1) {
2160 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2161 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2162 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2163 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2170 // If we got here then this is a brand new commit and needs full
2172 // Get what commits should be edited, these are the commits that
2173 // have live values for their keys
2174 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2176 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2177 while (kvit->hasNext()) {
2178 KeyValue *kv = kvit->next();
2179 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2181 commitsToEdit->add(commit);
2186 // Update each previous commit that needs to be updated
2187 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2188 while (commitit->hasNext()) {
2189 Commit *previousCommit = commitit->next();
2191 // Only bother with live commits (TODO: Maybe remove this check)
2192 if (previousCommit->isLive()) {
2194 // Update which keys in the old commits are still live
2196 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2197 while (kvit->hasNext()) {
2198 KeyValue *kv = kvit->next();
2199 previousCommit->invalidateKey(kv->getKey());
2204 // if the commit is now dead then remove it
2205 if (!previousCommit->isLive()) {
2206 commitForClientTable->remove(previousCommit->getSequenceNumber());
2207 delete previousCommit;
2213 // Update the last seen sequence number from this arbitrator
2214 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2215 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2216 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2219 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2222 // We processed a new commit that we havent seen before
2223 didProcessANewCommit = true;
2225 // Update the committed table of keys and which commit is using which key
2227 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2228 while (kvit->hasNext()) {
2229 KeyValue *kv = kvit->next();
2230 committedKeyValueTable->put(kv->getKey(), kv);
2231 liveCommitsByKeyTable->put(kv->getKey(), commit);
2239 return didProcessANewCommit;
2243 * Create the speculative table from transactions that are still live
2244 * and have come from the cloud
2246 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2247 if (liveTransactionBySequenceNumberTable->size() == 0) {
2248 // There is nothing to speculate on
2252 // Create a list of the transaction sequence numbers and sort them
2253 // from oldest to newest
2254 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2256 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2257 while (trit->hasNext())
2258 transactionSequenceNumbersSorted->add(trit->next());
2262 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2264 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2267 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2268 // If there is a gap in the transaction sequence numbers then
2269 // there was a commit or an abort of a transaction OR there was a
2270 // new commit (Could be from offline commit) so a redo the
2271 // speculation from scratch
2273 // Start from scratch
2274 speculatedKeyValueTable->clear();
2275 lastTransactionSequenceNumberSpeculatedOn = -1;
2276 oldestTransactionSequenceNumberSpeculatedOn = -1;
2279 // Remember the front of the transaction list
2280 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2282 // Find where to start arbitration from
2283 uint startIndex = 0;
2285 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2286 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2290 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2291 // Make sure we are not out of bounds
2292 return false; // did not speculate
2295 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2296 bool didSkip = true;
2298 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2299 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2300 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2302 if (!transaction->isComplete()) {
2303 // If there is an incomplete transaction then there is nothing
2304 // we can do add this transactions arbitrator to the list of
2305 // arbitrators we should ignore
2306 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2311 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2315 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2317 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2318 // Guard evaluated to true so update the speculative table
2320 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2321 while (kvit->hasNext()) {
2322 KeyValue *kv = kvit->next();
2323 speculatedKeyValueTable->put(kv->getKey(), kv);
2331 // Since there was a skip we need to redo the speculation next time around
2332 lastTransactionSequenceNumberSpeculatedOn = -1;
2333 oldestTransactionSequenceNumberSpeculatedOn = -1;
2336 // We did some speculation
2341 * Create the pending transaction speculative table from transactions
2342 * that are still in the pending transaction buffer
2344 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2345 if (pendingTransactionQueue->size() == 0) {
2346 // There is nothing to speculate on
2350 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2351 // need to reset on the pending speculation
2352 lastPendingTransactionSpeculatedOn = NULL;
2353 firstPendingTransaction = pendingTransactionQueue->get(0);
2354 pendingTransactionSpeculatedKeyValueTable->clear();
2357 // Find where to start arbitration from
2358 uint startIndex = 0;
2360 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2361 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2364 if (startIndex >= pendingTransactionQueue->size()) {
2365 // Make sure we are not out of bounds
2369 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2370 Transaction *transaction = pendingTransactionQueue->get(i);
2372 lastPendingTransactionSpeculatedOn = transaction;
2374 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2375 // Guard evaluated to true so update the speculative table
2376 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2377 while (kvit->hasNext()) {
2378 KeyValue *kv = kvit->next();
2379 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2387 * Set dead and remove from the live transaction tables the
2388 * transactions that are dead
2390 void Table::updateLiveTransactionsAndStatus() {
2391 // Go through each of the transactions
2393 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2394 while (iter->hasNext()) {
2395 int64_t key = iter->next();
2396 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2398 // Check if the transaction is dead
2399 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2400 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2401 // Set dead the transaction
2402 transaction->setDead();
2404 // Remove the transaction from the live table
2406 liveTransactionByTransactionIdTable->remove(transaction->getId());
2413 // Go through each of the transactions
2415 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2416 while (iter->hasNext()) {
2417 int64_t key = iter->next();
2418 TransactionStatus *status = outstandingTransactionStatus->get(key);
2420 // Check if the transaction is dead
2421 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2422 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2424 status->setStatus(TransactionStatus_StatusCommitted);
2435 * Process this slot, entry by entry-> Also update the latest message sent by slot
2437 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2439 // Update the last message seen
2440 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2442 // Process each entry in the slot
2443 Vector<Entry *> *entries = slot->getEntries();
2444 uint eSize = entries->size();
2445 for (uint ei = 0; ei < eSize; ei++) {
2446 Entry *entry = entries->get(ei);
2447 switch (entry->getType()) {
2448 case TypeCommitPart:
2449 processEntry((CommitPart *)entry);
2452 processEntry((Abort *)entry);
2454 case TypeTransactionPart:
2455 processEntry((TransactionPart *)entry);
2458 processEntry((NewKey *)entry);
2460 case TypeLastMessage:
2461 processEntry((LastMessage *)entry, machineSet);
2463 case TypeRejectedMessage:
2464 processEntry((RejectedMessage *)entry, indexer);
2466 case TypeTableStatus:
2467 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2470 throw new Error("Unrecognized type: ");
2476 * Update the last message that was sent for a machine Id
2478 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2479 // Update what the last message received by a machine was
2480 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2484 * Add the new key to the arbitrators table and update the set of live
2485 * new keys (in case of a rescued new key message)
2487 void Table::processEntry(NewKey *entry) {
2488 // Update the arbitrator table with the new key information
2489 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2491 // Update what the latest live new key is
2492 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2493 if (oldNewKey != NULL) {
2494 // Delete the old new key messages
2495 oldNewKey->setDead();
2500 * Process new table status entries and set dead the old ones as new
2501 * ones come in-> keeps track of the largest and smallest table status
2502 * seen in this current round of updating the local copy of the block
2505 void Table::processEntry(TableStatus *entry, int64_t seq) {
2506 int newNumSlots = entry->getMaxSlots();
2507 updateCurrMaxSize(newNumSlots);
2508 initExpectedSize(seq, newNumSlots);
2510 if (liveTableStatus != NULL) {
2511 // We have a larger table status so the old table status is no
2513 liveTableStatus->setDead();
2516 // Make this new table status the latest alive table status
2517 liveTableStatus = entry;
2521 * Check old messages to see if there is a block chain violation->
2524 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2525 int64_t oldSeqNum = entry->getOldSeqNum();
2526 int64_t newSeqNum = entry->getNewSeqNum();
2527 bool isequal = entry->getEqual();
2528 int64_t machineId = entry->getMachineID();
2529 int64_t seq = entry->getSequenceNumber();
2531 // Check if we have messages that were supposed to be rejected in
2532 // our local block chain
2533 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2535 Slot *slot = indexer->getSlot(seqNum);
2538 // If we have this slot make sure that it was not supposed to be
2540 int64_t slotMachineId = slot->getMachineID();
2541 if (isequal != (slotMachineId == machineId)) {
2542 throw new Error("Server Error: Trying to insert rejected message for slot ");
2547 // Create a list of clients to watch until they see this rejected
2549 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2550 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2551 while (iter->hasNext()) {
2552 // Machine ID for the last message entry
2553 int64_t lastMessageEntryMachineId = iter->next();
2555 // We've seen it, don't need to continue to watch-> Our next
2556 // message will implicitly acknowledge it->
2557 if (lastMessageEntryMachineId == localMachineId) {
2561 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2562 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2564 if (entrySequenceNumber < seq) {
2565 // Add this rejected message to the set of messages that this
2566 // machine ID did not see yet
2567 addWatchVector(lastMessageEntryMachineId, entry);
2568 // This client did not see this rejected message yet so add it
2569 // to the watch set to monitor
2570 deviceWatchSet->add(lastMessageEntryMachineId);
2575 if (deviceWatchSet->isEmpty()) {
2576 // This rejected message has been seen by all the clients so
2579 // We need to watch this rejected message
2580 entry->setWatchSet(deviceWatchSet);
2585 * Check if this abort is live, if not then save it so we can kill it
2586 * later-> update the last transaction number that was arbitrated on->
2588 void Table::processEntry(Abort *entry) {
2589 if (entry->getTransactionSequenceNumber() != -1) {
2590 // update the transaction status if it was sent to the server
2591 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2592 if (status != NULL) {
2593 status->setStatus(TransactionStatus_StatusAborted);
2597 // Abort has not been seen by the client it is for yet so we need to
2600 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2601 if (previouslySeenAbort != NULL) {
2602 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2605 if (entry->getTransactionArbitrator() == localMachineId) {
2606 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2609 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2610 // The machine already saw this so it is dead
2612 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2613 liveAbortTable->remove(&abortid);
2615 if (entry->getTransactionArbitrator() == localMachineId) {
2616 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2621 // Update the last arbitration data that we have seen so far
2622 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2623 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2624 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2626 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2629 // Never seen any data from this arbitrator so record the first one
2630 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2633 // Set dead a transaction if we can
2634 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2636 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2637 if (transactionToSetDead != NULL) {
2638 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2641 // Update the last transaction sequence number that the arbitrator
2643 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2644 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2646 if (entry->getTransactionSequenceNumber() != -1) {
2647 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2653 * Set dead the transaction part if that transaction is dead and keep
2654 * track of all new parts
2656 void Table::processEntry(TransactionPart *entry) {
2657 // Check if we have already seen this transaction and set it dead OR
2658 // if it is not alive
2659 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2660 // This transaction is dead, it was already committed or aborted
2665 // This part is still alive
2666 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2668 if (transactionPart == NULL) {
2669 // Dont have a table for this machine Id yet so make one
2670 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2671 newTransactionParts->put(entry->getMachineId(), transactionPart);
2674 // Update the part and set dead ones we have already seen (got a
2676 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2677 if (previouslySeenPart != NULL) {
2678 previouslySeenPart->setDead();
2683 * Process new commit entries and save them for future use-> Delete duplicates
2685 void Table::processEntry(CommitPart *entry) {
2686 // Update the last transaction that was updated if we can
2687 if (entry->getTransactionSequenceNumber() != -1) {
2688 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2689 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2693 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2694 if (commitPart == NULL) {
2695 // Don't have a table for this machine Id yet so make one
2696 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2697 newCommitParts->put(entry->getMachineId(), commitPart);
2699 // Update the part and set dead ones we have already seen (got a
2701 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2702 if (previouslySeenPart != NULL) {
2703 previouslySeenPart->setDead();
2708 * Update the last message seen table-> Update and set dead the
2709 * appropriate RejectedMessages as clients see them-> Updates the live
2710 * aborts, removes those that are dead and sets them dead-> Check that
2711 * the last message seen is correct and that there is no mismatch of
2712 * our own last message or that other clients have not had a rollback
2713 * on the last message->
2715 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2716 // We have seen this machine ID
2717 machineSet->remove(machineId);
2719 // Get the set of rejected messages that this machine Id is has not seen yet
2720 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2721 // If there is a rejected message that this machine Id has not seen yet
2722 if (watchset != NULL) {
2723 // Go through each rejected message that this machine Id has not
2726 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2727 while (rmit->hasNext()) {
2728 RejectedMessage *rm = rmit->next();
2729 // If this machine Id has seen this rejected message->->->
2730 if (rm->getSequenceNumber() <= seqNum) {
2731 // Remove it from our watchlist
2733 // Decrement machines that need to see this notification
2734 rm->removeWatcher(machineId);
2741 // Set dead the abort
2742 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2744 while (abortit->hasNext()) {
2745 Pair<int64_t, int64_t> *key = abortit->next();
2746 Abort *abort = liveAbortTable->get(key);
2747 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2750 if (abort->getTransactionArbitrator() == localMachineId) {
2751 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2756 if (machineId == localMachineId) {
2757 // Our own messages are immediately dead->
2758 char livenessType = liveness->getType();
2759 if (livenessType == TypeLastMessage) {
2760 ((LastMessage *)liveness)->setDead();
2761 } else if (livenessType == TypeSlot) {
2762 ((Slot *)liveness)->setDead();
2764 throw new Error("Unrecognized type");
2767 // Get the old last message for this device
2768 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2769 if (lastMessageEntry == NULL) {
2770 // If no last message then there is nothing else to process
2774 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2775 Liveness *lastEntry = lastMessageEntry->getSecond();
2776 delete lastMessageEntry;
2778 // If it is not our machine Id since we already set ours to dead
2779 if (machineId != localMachineId) {
2780 char lastEntryType = lastEntry->getType();
2782 if (lastEntryType == TypeLastMessage) {
2783 ((LastMessage *)lastEntry)->setDead();
2784 } else if (lastEntryType == TypeSlot) {
2785 ((Slot *)lastEntry)->setDead();
2787 throw new Error("Unrecognized type");
2790 // Make sure the server is not playing any games
2791 if (machineId == localMachineId) {
2792 if (hadPartialSendToServer) {
2793 // We were not making any updates and we had a machine mismatch
2794 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2795 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2798 // We were not making any updates and we had a machine mismatch
2799 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2800 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2804 if (lastMessageSeqNum > seqNum) {
2805 throw new Error("Server Error: Rollback on remote machine sequence number");
2811 * Add a rejected message entry to the watch set to keep track of
2812 * which clients have seen that rejected message entry and which have
2815 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2816 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2817 if (entries == NULL) {
2818 // There is no set for this machine ID yet so create one
2819 entries = new Hashset<RejectedMessage *>();
2820 rejectedMessageWatchVectorTable->put(machineId, entries);
2822 entries->add(entry);
2826 * Check if the HMAC chain is not violated
2828 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2829 for (uint i = 0; i < newSlots->length(); i++) {
2830 Slot *currSlot = newSlots->get(i);
2831 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2832 if (prevSlot != NULL &&
2833 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2834 throw new Error("Server Error: Invalid HMAC Chain");