3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void * a, const void *b) {
25 const int64_t * pa = (const int64_t *) a;
26 const int64_t * pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localTransactionSequenceNumber(0),
50 lastTransactionSequenceNumberSpeculatedOn(0),
51 oldestTransactionSequenceNumberSpeculatedOn(0),
52 localArbitrationSequenceNumber(0),
53 hadPartialSendToServer(false),
54 attemptedToSendToServer(false),
56 didFindTableStatus(false),
58 lastSlotAttemptedToSend(NULL),
61 lastTransactionPartsSent(NULL),
62 lastPendingSendArbitrationEntriesToDelete(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localTransactionSequenceNumber(0),
112 lastTransactionSequenceNumberSpeculatedOn(0),
113 oldestTransactionSequenceNumberSpeculatedOn(0),
114 localArbitrationSequenceNumber(0),
115 hadPartialSendToServer(false),
116 attemptedToSendToServer(false),
118 didFindTableStatus(false),
120 lastSlotAttemptedToSend(NULL),
123 lastTransactionPartsSent(NULL),
124 lastPendingSendArbitrationEntriesToDelete(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
160 * Init all the stuff needed for for table usage
163 // Init helper objects
164 random = new Random();
165 buffer = new SlotBuffer();
168 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174 arbitratorTable = new Hashtable<IoTString *, int64_t>();
175 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184 rejectedSlotVector = new Vector<int64_t>();
185 pendingTransactionQueue = new Vector<Transaction *>();
186 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
197 numberOfSlots = buffer->capacity();
198 setResizeThreshold();
202 * Initialize the table by inserting a table status as the first entry
203 * into the table status also initialize the crypto stuff.
205 void Table::initTable() {
206 cloud->initSecurity();
208 // Create the first insertion into the block chain which is the table status
209 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210 localSequenceNumber++;
211 TableStatus *status = new TableStatus(s, numberOfSlots);
213 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
216 array = new Array<Slot *>(1);
218 // update local block chain
219 validateAndUpdate(array, true);
220 } else if (array->length() == 1) {
221 // in case we did push the slot BUT we failed to init it
222 validateAndUpdate(array, true);
224 throw new Error("Error on initialization");
229 * Rebuild the table from scratch by pulling the latest block chain
232 void Table::rebuild() {
233 // Just pull the latest slots from the server
234 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235 validateAndUpdate(newslots, true);
237 updateLiveTransactionsAndStatus();
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
244 int64_t Table::getArbitrator(IoTString *key) {
245 return arbitratorTable->get(key);
248 void Table::close() {
252 IoTString *Table::getCommitted(IoTString *key) {
253 KeyValue *kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getSpeculative(IoTString *key) {
263 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
266 kv = speculatedKeyValueTable->get(key);
270 kv = committedKeyValueTable->get(key);
274 return kv->getValue();
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281 KeyValue *kv = committedKeyValueTable->get(key);
283 if (!arbitratorTable->contains(key)) {
284 throw new Error("Key not Found.");
287 // Make sure new key value pair matches the current arbitrator
288 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289 // TODO: Maybe not throw en error
290 throw new Error("Not all Key Values Match Arbitrator.");
294 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295 return kv->getValue();
297 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303 if (!arbitratorTable->contains(key)) {
304 throw new Error("Key not Found.");
307 // Make sure new key value pair matches the current arbitrator
308 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309 // TODO: Maybe not throw en error
310 throw new Error("Not all Key Values Match Arbitrator.");
313 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
316 kv = speculatedKeyValueTable->get(key);
320 kv = committedKeyValueTable->get(key);
324 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325 return kv->getValue();
327 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
332 bool Table::update() {
334 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335 validateAndUpdate(newSlots, false);
337 updateLiveTransactionsAndStatus();
339 } catch (Exception *e) {
340 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341 while (kit->hasNext()) {
342 int64_t m = kit->next();
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
353 if (!arbitratorTable->contains(keyName)) {
354 // There is already an arbitrator
357 NewKey *newKey = new NewKey(NULL, keyName, machineId);
359 if (sendToServer(newKey)) {
360 // If successfully inserted
366 void Table::startTransaction() {
367 // Create a new transaction, invalidates any old pending transactions.
368 pendingTransactionBuilder = new PendingTransaction(localMachineId);
371 void Table::addKV(IoTString *key, IoTString *value) {
373 // Make sure it is a valid key
374 if (!arbitratorTable->contains(key)) {
375 throw new Error("Key not Found.");
378 // Make sure new key value pair matches the current arbitrator
379 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380 // TODO: Maybe not throw en error
381 throw new Error("Not all Key Values Match Arbitrator.");
384 // Add the key value to this transaction
385 KeyValue *kv = new KeyValue(key, value);
386 pendingTransactionBuilder->addKV(kv);
389 TransactionStatus *Table::commitTransaction() {
390 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391 // transaction with no updates will have no effect on the system
392 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
395 // Set the local transaction sequence number and increment
396 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397 localTransactionSequenceNumber++;
399 // Create the transaction status
400 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
402 // Create the new transaction
403 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404 newTransaction->setTransactionStatus(transactionStatus);
406 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407 // Add it to the queue and invalidate the builder for safety
408 pendingTransactionQueue->add(newTransaction);
410 arbitrateOnLocalTransaction(newTransaction);
411 updateLiveStateFromLocal();
414 pendingTransactionBuilder = new PendingTransaction(localMachineId);
418 } catch (ServerException *e) {
420 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421 uint size = pendingTransactionQueue->size();
423 for (int iter = 0; iter < size; iter++) {
424 Transaction *transaction = pendingTransactionQueue->get(iter);
425 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
427 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428 // Already contacted this client so ignore all attempts to contact this client
429 // to preserve ordering for arbitrator
433 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
435 if (sendReturn.getFirst()) {
436 // Failed to contact over local
437 arbitratorTriedAndFailed->add(transaction->getArbitrator());
439 // Successful contact or should not contact
441 if (sendReturn.getSecond()) {
447 pendingTransactionQueue->setSize(oldindex);
450 updateLiveStateFromLocal();
452 return transactionStatus;
456 * Recalculate the new resize threshold
458 void Table::setResizeThreshold() {
459 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
463 int64_t Table::getLocalSequenceNumber() {
464 return localSequenceNumber;
467 bool Table::sendToServer(NewKey *newKey) {
468 bool fromRetry = false;
470 if (hadPartialSendToServer) {
471 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472 if (newSlots->length() == 0) {
474 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
476 if (sendSlotsReturn.getFirst()) {
477 if (newKey != NULL) {
478 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
483 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484 while (trit->hasNext()) {
485 Transaction *transaction = trit->next();
486 transaction->resetServerFailure();
487 // Update which transactions parts still need to be sent
488 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489 // Add the transaction status to the outstanding list
490 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
492 // Update the transaction status
493 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
495 // Check if all the transaction parts were successfully
496 // sent and if so then remove it from pending
497 if (transaction->didSendAllParts()) {
498 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499 pendingTransactionQueue->remove(transaction);
504 newSlots = sendSlotsReturn.getThird();
505 bool isInserted = false;
506 for (uint si = 0; si < newSlots->length(); si++) {
507 Slot *s = newSlots->get(si);
508 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
514 for (uint si = 0; si < newSlots->length(); si++) {
515 Slot *s = newSlots->get(si);
520 // Process each entry in the slot
521 Vector<Entry *> *ventries = s->getEntries();
522 uint vesize = ventries->size();
523 for (uint vei = 0; vei < vesize; vei++) {
524 Entry *entry = ventries->get(vei);
525 if (entry->getType() == TypeLastMessage) {
526 LastMessage *lastMessage = (LastMessage *)entry;
527 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
536 if (newKey != NULL) {
537 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
542 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543 while (trit->hasNext()) {
544 Transaction *transaction = trit->next();
545 transaction->resetServerFailure();
547 // Update which transactions parts still need to be sent
548 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
550 // Add the transaction status to the outstanding list
551 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
553 // Update the transaction status
554 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
556 // Check if all the transaction parts were successfully sent and if so then remove it from pending
557 if (transaction->didSendAllParts()) {
558 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559 pendingTransactionQueue->remove(transaction);
561 transaction->resetServerFailure();
562 // Set the transaction sequence number back to nothing
563 if (!transaction->didSendAPartToServer()) {
564 transaction->setSequenceNumber(-1);
572 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573 while (trit->hasNext()) {
574 Transaction *transaction = trit->next();
575 transaction->resetServerFailure();
576 // Set the transaction sequence number back to nothing
577 if (!transaction->didSendAPartToServer()) {
578 transaction->setSequenceNumber(-1);
583 if (sendSlotsReturn.getThird()->length() != 0) {
584 // insert into the local block chain
585 validateAndUpdate(sendSlotsReturn.getThird(), true);
589 bool isInserted = false;
590 for (uint si = 0; si < newSlots->length(); si++) {
591 Slot *s = newSlots->get(si);
592 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
598 for (uint si = 0; si < newSlots->length(); si++) {
599 Slot *s = newSlots->get(si);
604 // Process each entry in the slot
605 Vector<Entry *> *entries = s->getEntries();
606 uint eSize = entries->size();
607 for(uint ei=0; ei < eSize; ei++) {
608 Entry * entry = entries->get(ei);
610 if (entry->getType() == TypeLastMessage) {
611 LastMessage *lastMessage = (LastMessage *)entry;
612 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
621 if (newKey != NULL) {
622 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
627 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628 while (trit->hasNext()) {
629 Transaction *transaction = trit->next();
630 transaction->resetServerFailure();
632 // Update which transactions parts still need to be sent
633 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
635 // Add the transaction status to the outstanding list
636 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
638 // Update the transaction status
639 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
641 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642 if (transaction->didSendAllParts()) {
643 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644 pendingTransactionQueue->remove(transaction);
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656 while (trit->hasNext()) {
657 Transaction *transaction = trit->next();
658 transaction->resetServerFailure();
659 // Set the transaction sequence number back to nothing
660 if (!transaction->didSendAPartToServer()) {
661 transaction->setSequenceNumber(-1);
667 // insert into the local block chain
668 validateAndUpdate(newSlots, true);
671 } catch (ServerException *e) {
678 // While we have stuff that needs inserting into the block chain
679 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
683 if (hadPartialSendToServer) {
684 throw new Error("Should Be error free");
689 // If there is a new key with same name then end
690 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
695 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696 localSequenceNumber++;
698 // Try to fill the slot with data
699 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700 bool needsResize = fillSlotsReturn.getFirst();
701 int newSize = fillSlotsReturn.getSecond();
702 bool insertedNewKey = fillSlotsReturn.getThird();
705 // Reset which transaction to send
706 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707 while (trit->hasNext()) {
708 Transaction *transaction = trit->next();
709 transaction->resetNextPartToSend();
711 // Set the transaction sequence number back to nothing
712 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713 transaction->setSequenceNumber(-1);
718 // Clear the sent data since we are trying again
719 pendingSendArbitrationEntriesToDelete->clear();
720 transactionPartsSent->clear();
722 // We needed a resize so try again
723 fillSlot(slot, true, newKey);
726 lastSlotAttemptedToSend = slot;
727 lastIsNewKey = (newKey != NULL);
728 lastInsertedNewKey = insertedNewKey;
729 lastNewSize = newSize;
731 lastTransactionPartsSent = transactionPartsSent->clone();
732 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
734 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
736 if (sendSlotsReturn.getFirst()) {
738 // Did insert into the block chain
740 if (insertedNewKey) {
741 // This slot was what was inserted not a previous slot
743 // New Key was successfully inserted into the block chain so dont want to insert it again
747 // Remove the aborts and commit parts that were sent from the pending to send queue
748 uint size = pendingSendArbitrationRounds->size();
750 for (uint i = 0; i < size; i++) {
751 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752 round->removeParts(pendingSendArbitrationEntriesToDelete);
754 if (!round->isDoneSending()) {
755 // Sent all the parts
756 pendingSendArbitrationRounds->set(oldcount++,
757 pendingSendArbitrationRounds->get(i));
760 pendingSendArbitrationRounds->setSize(oldcount);
762 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763 while (trit->hasNext()) {
764 Transaction *transaction = trit->next();
765 transaction->resetServerFailure();
767 // Update which transactions parts still need to be sent
768 transaction->removeSentParts(transactionPartsSent->get(transaction));
770 // Add the transaction status to the outstanding list
771 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
773 // Update the transaction status
774 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
776 // Check if all the transaction parts were successfully sent and if so then remove it from pending
777 if (transaction->didSendAllParts()) {
778 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779 pendingTransactionQueue->remove(transaction);
784 // Reset which transaction to send
785 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786 while (trit->hasNext()) {
787 Transaction *transaction = trit->next();
788 transaction->resetNextPartToSend();
790 // Set the transaction sequence number back to nothing
791 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792 transaction->setSequenceNumber(-1);
798 // Clear the sent data in preparation for next send
799 pendingSendArbitrationEntriesToDelete->clear();
800 transactionPartsSent->clear();
802 if (sendSlotsReturn.getThird()->length() != 0) {
803 // insert into the local block chain
804 validateAndUpdate(sendSlotsReturn.getThird(), true);
808 } catch (ServerException *e) {
809 if (e->getType() != ServerException_TypeInputTimeout) {
810 // Nothing was able to be sent to the server so just clear these data structures
811 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812 while (trit->hasNext()) {
813 Transaction *transaction = trit->next();
814 transaction->resetNextPartToSend();
816 // Set the transaction sequence number back to nothing
817 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818 transaction->setSequenceNumber(-1);
823 // There was a partial send to the server
824 hadPartialSendToServer = true;
826 // Nothing was able to be sent to the server so just clear these data structures
827 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828 while (trit->hasNext()) {
829 Transaction *transaction = trit->next();
830 transaction->resetNextPartToSend();
831 transaction->setServerFailure();
836 pendingSendArbitrationEntriesToDelete->clear();
837 transactionPartsSent->clear();
842 return newKey == NULL;
845 bool Table::updateFromLocal(int64_t machineId) {
846 if (!localCommunicationTable->contains(machineId))
849 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
851 // Get the size of the send data
852 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
859 Array<char> *sendData = new Array<char>(sendDataSize);
860 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
863 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
867 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868 localSequenceNumber++;
870 if (returnData == NULL) {
871 // Could not contact server
876 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877 int numberOfEntries = bbDecode->getInt();
879 for (int i = 0; i < numberOfEntries; i++) {
880 char type = bbDecode->get();
881 if (type == TypeAbort) {
882 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
884 } else if (type == TypeCommitPart) {
885 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886 processEntry(commitPart);
890 updateLiveStateFromLocal();
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
897 // Get the devices local communications
898 if (!localCommunicationTable->contains(transaction->getArbitrator()))
899 return Pair<bool, bool>(true, false);
901 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
903 // Get the size of the send data
904 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
906 Vector<TransactionPart *> * tParts = transaction->getParts();
907 uint tPartsSize = tParts->size();
908 for (uint i = 0; i < tPartsSize; i++) {
909 TransactionPart * part = tParts->get(i);
910 sendDataSize += part->getSize();
914 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
919 // Make the send data size
920 Array<char> *sendData = new Array<char>(sendDataSize);
921 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
924 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925 bbEncode->putInt(transaction->getParts()->size());
927 Vector<TransactionPart *> * tParts = transaction->getParts();
928 uint tPartsSize = tParts->size();
929 for (uint i = 0; i < tPartsSize; i++) {
930 TransactionPart * part = tParts->get(i);
931 part->encode(bbEncode);
936 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937 localSequenceNumber++;
939 if (returnData == NULL) {
940 // Could not contact server
941 return Pair<bool, bool>(true, false);
945 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946 bool didCommit = bbDecode->get() == 1;
947 bool couldArbitrate = bbDecode->get() == 1;
948 int numberOfEntries = bbDecode->getInt();
949 bool foundAbort = false;
951 for (int i = 0; i < numberOfEntries; i++) {
952 char type = bbDecode->get();
953 if (type == TypeAbort) {
954 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
956 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
961 } else if (type == TypeCommitPart) {
962 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963 processEntry(commitPart);
967 updateLiveStateFromLocal();
969 if (couldArbitrate) {
970 TransactionStatus * status = transaction->getTransactionStatus();
972 status->setStatus(TransactionStatus_StatusCommitted);
974 status->setStatus(TransactionStatus_StatusAborted);
977 TransactionStatus * status = transaction->getTransactionStatus();
979 status->setStatus(TransactionStatus_StatusAborted);
981 status->setStatus(TransactionStatus_StatusCommitted);
985 return Pair<bool, bool>(false, true);
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
990 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992 int numberOfParts = bbDecode->getInt();
994 // If we did commit a transaction or not
995 bool didCommit = false;
996 bool couldArbitrate = false;
998 if (numberOfParts != 0) {
1000 // decode the transaction
1001 Transaction *transaction = new Transaction();
1002 for (int i = 0; i < numberOfParts; i++) {
1004 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005 transaction->addPartDecode(newPart);
1008 // Arbitrate on transaction and pull relevant return data
1009 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010 couldArbitrate = localArbitrateReturn.getFirst();
1011 didCommit = localArbitrateReturn.getSecond();
1013 updateLiveStateFromLocal();
1015 // Transaction was sent to the server so keep track of it to prevent double commit
1016 if (transaction->getSequenceNumber() != -1) {
1017 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1021 // The data to send back
1022 int returnDataSize = 0;
1023 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1025 // Get the aborts to send back
1026 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1028 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029 while(abortit->hasNext())
1030 abortLocalSequenceNumbers->add(abortit->next());
1034 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1036 uint asize = abortLocalSequenceNumbers->size();
1037 for(uint i=0; i<asize; i++) {
1038 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1043 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044 unseenArbitrations->add(abort);
1045 returnDataSize += abort->getSize();
1048 // Get the commits to send back
1049 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050 if (commitForClientTable != NULL) {
1051 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1053 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054 while(commitit->hasNext())
1055 commitLocalSequenceNumbers->add(commitit->next());
1058 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1060 uint clsSize = commitLocalSequenceNumbers->size();
1061 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063 Commit *commit = commitForClientTable->get(localSequenceNumber);
1065 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1070 Vector<CommitPart *> * parts = commit->getParts();
1071 uint nParts = parts->size();
1072 for(uint i=0; i<nParts; i++) {
1073 CommitPart * commitPart = parts->get(i);
1074 unseenArbitrations->add(commitPart);
1075 returnDataSize += commitPart->getSize();
1081 // Number of arbitration entries to decode
1082 returnDataSize += 2 * sizeof(int32_t);
1084 // bool of did commit or not
1085 if (numberOfParts != 0) {
1086 returnDataSize += sizeof(char);
1089 // Data to send Back
1090 Array<char> *returnData = new Array<char>(returnDataSize);
1091 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1093 if (numberOfParts != 0) {
1095 bbEncode->put((char)1);
1097 bbEncode->put((char)0);
1099 if (couldArbitrate) {
1100 bbEncode->put((char)1);
1102 bbEncode->put((char)0);
1106 bbEncode->putInt(unseenArbitrations->size());
1107 uint size = unseenArbitrations->size();
1108 for (uint i = 0; i < size; i++) {
1109 Entry *entry = unseenArbitrations->get(i);
1110 entry->encode(bbEncode);
1113 localSequenceNumber++;
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119 attemptedToSendToServer = true;
1121 bool inserted = false;
1122 bool lastTryInserted = false;
1124 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125 if (array == NULL) {
1126 array = new Array<Slot *>();
1127 array->set(0, slot);
1128 rejectedSlotVector->clear();
1131 if (array->length() == 0) {
1132 throw new Error("Server Error: Did not send any slots");
1135 // if (attemptedToSendToServerTmp) {
1136 if (hadPartialSendToServer) {
1138 bool isInserted = false;
1139 uint size = array->length();
1140 for (uint i = 0; i < size; i++) {
1141 Slot *s = array->get(i);
1142 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1148 for (uint i = 0; i < size; i++) {
1149 Slot *s = array->get(i);
1154 // Process each entry in the slot
1155 Vector<Entry *> *entries = s->getEntries();
1156 uint eSize = entries->size();
1157 for(uint ei=0; ei < eSize; ei++) {
1158 Entry * entry = entries->get(ei);
1160 if (entry->getType() == TypeLastMessage) {
1161 LastMessage *lastMessage = (LastMessage *)entry;
1163 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1172 rejectedSlotVector->add(slot->getSequenceNumber());
1173 lastTryInserted = false;
1175 lastTryInserted = true;
1178 rejectedSlotVector->add(slot->getSequenceNumber());
1179 lastTryInserted = false;
1183 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1187 * Returns false if a resize was needed
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1191 if (liveSlotCount > bufferResizeThreshold) {
1192 resize = true;//Resize is forced
1196 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197 TableStatus *status = new TableStatus(slot, newSize);
1198 slot->addEntry(status);
1201 // Fill with rejected slots first before doing anything else
1202 doRejectedMessages(slot);
1204 // Do mandatory rescue of entries
1205 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1207 // Extract working variables
1208 bool needsResize = mandatoryRescueReturn.getFirst();
1209 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1212 if (needsResize && !resize) {
1213 // We need to resize but we are not resizing so return false
1214 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1217 bool inserted = false;
1218 if (newKeyEntry != NULL) {
1219 newKeyEntry->setSlot(slot);
1220 if (slot->hasSpace(newKeyEntry)) {
1221 slot->addEntry(newKeyEntry);
1226 // Clear the transactions, aborts and commits that were sent previously
1227 transactionPartsSent->clear();
1228 pendingSendArbitrationEntriesToDelete->clear();
1229 uint size = pendingSendArbitrationRounds->size();
1230 for (uint i = 0; i < size; i++) {
1231 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232 bool isFull = false;
1233 round->generateParts();
1234 Vector<Entry *> *parts = round->getParts();
1236 // Insert pending arbitration data
1237 uint vsize = parts->size();
1238 for (uint vi = 0; vi < vsize; vi++) {
1239 Entry *arbitrationData = parts->get(vi);
1241 // If it is an abort then we need to set some information
1242 if (arbitrationData->getType() == TypeAbort) {
1243 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1246 if (!slot->hasSpace(arbitrationData)) {
1247 // No space so cant do anything else with these data entries
1252 // Add to this current slot and add it to entries to delete
1253 slot->addEntry(arbitrationData);
1254 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1262 if (pendingTransactionQueue->size() > 0) {
1263 Transaction *transaction = pendingTransactionQueue->get(0);
1264 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266 transaction->setSequenceNumber(slot->getSequenceNumber());
1270 TransactionPart *part = transaction->getNextPartToSend();
1272 // Ran out of parts to send for this transaction so move on
1276 if (slot->hasSpace(part)) {
1277 slot->addEntry(part);
1278 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279 if (partsSent == NULL) {
1280 partsSent = new Vector<int32_t>();
1281 transactionPartsSent->put(transaction, partsSent);
1283 partsSent->add(part->getPartNumber());
1284 transactionPartsSent->put(transaction, partsSent);
1291 // Fill the remainder of the slot with rescue data
1292 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1294 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1297 void Table::doRejectedMessages(Slot *s) {
1298 if (!rejectedSlotVector->isEmpty()) {
1299 /* TODO: We should avoid generating a rejected message entry if
1300 * there is already a sufficient entry in the queue (e->g->,
1301 * equalsto value of true and same sequence number)-> */
1303 int64_t old_seqn = rejectedSlotVector->get(0);
1304 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305 int64_t new_seqn = rejectedSlotVector->lastElement();
1306 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1309 int64_t prev_seqn = -1;
1311 /* Go through list of missing messages */
1312 for (; i < rejectedSlotVector->size(); i++) {
1313 int64_t curr_seqn = rejectedSlotVector->get(i);
1314 Slot *s_msg = buffer->getSlot(curr_seqn);
1317 prev_seqn = curr_seqn;
1319 /* Generate rejected message entry for missing messages */
1320 if (prev_seqn != -1) {
1321 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1324 /* Generate rejected message entries for present messages */
1325 for (; i < rejectedSlotVector->size(); i++) {
1326 int64_t curr_seqn = rejectedSlotVector->get(i);
1327 Slot *s_msg = buffer->getSlot(curr_seqn);
1328 int64_t machineid = s_msg->getMachineID();
1329 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1343 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344 bool seenLiveSlot = false;
1345 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1346 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1350 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352 // Push slot number forward
1353 if (!seenLiveSlot) {
1354 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1357 if (!previousSlot->isLive()) {
1361 // We have seen a live slot
1362 seenLiveSlot = true;
1364 // Get all the live entries for a slot
1365 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1367 // Iterate over all the live entries and try to rescue them
1368 uint lESize = liveEntries->size();
1369 for (uint i=0; i< lESize; i++) {
1370 Entry * liveEntry = liveEntries->get(i);
1371 if (slot->hasSpace(liveEntry)) {
1372 // Enough space to rescue the entry
1373 slot->addEntry(liveEntry);
1374 } else if (currentSequenceNumber == firstIfFull) {
1375 //if there's no space but the entry is about to fall off the queue
1376 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1382 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386 /* now go through live entries from least to greatest sequence number until
1387 * either all live slots added, or the slot doesn't have enough room
1388 * for SKIP_THRESHOLD consecutive entries*/
1390 int64_t newestseqnum = buffer->getNewestSeqNum();
1392 for (; seqn <= newestseqnum; seqn++) {
1393 Slot *prevslot = buffer->getSlot(seqn);
1394 //Push slot number forward
1396 oldestLiveSlotSequenceNumver = seqn;
1398 if (!prevslot->isLive())
1400 seenliveslot = true;
1401 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402 uint lESize = liveentries->size();
1403 for (uint i=0; i< lESize; i++) {
1404 Entry * liveentry = liveentries->get(i);
1405 if (s->hasSpace(liveentry))
1406 s->addEntry(liveentry);
1409 if (skipcount > Table_SKIP_THRESHOLD)
1419 * Checks for malicious activity and updates the local copy of the block chain->
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422 // The cloud communication layer has checked slot HMACs already
1424 if (newSlots->length() == 0) {
1428 // Make sure all slots are newer than the last largest slot this
1430 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431 if (firstSeqNum <= sequenceNumber) {
1432 throw new Error("Server Error: Sent older slots!");
1435 // Create an object that can access both new slots and slots in our
1436 // local chain without committing slots to our local chain
1437 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1439 // Check that the HMAC chain is not broken
1440 checkHMACChain(indexer, newSlots);
1442 // Set to keep track of messages from clients
1443 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1445 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446 while(lmit->hasNext())
1447 machineSet->add(lmit->next());
1451 // Process each slots data
1453 uint numSlots = newSlots->length();
1454 for(uint i=0; i<numSlots; i++) {
1455 Slot *slot = newSlots->get(i);
1456 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457 updateExpectedSize();
1461 // If there is a gap, check to see if the server sent us
1463 if (firstSeqNum != (sequenceNumber + 1)) {
1465 // Check the size of the slots that were sent down by the server->
1466 // Can only check the size if there was a gap
1467 checkNumSlots(newSlots->length());
1469 // Since there was a gap every machine must have pushed a slot or
1470 // must have a last message message-> If not then the server is
1472 if (!machineSet->isEmpty()) {
1473 throw new Error("Missing record for machines: ");
1477 // Update the size of our local block chain->
1480 // Commit new to slots to the local block chain->
1482 uint numSlots = newSlots->length();
1483 for(uint i=0; i<numSlots; i++) {
1484 Slot *slot = newSlots->get(i);
1486 // Insert this slot into our local block chain copy->
1487 buffer->putSlot(slot);
1489 // Keep track of how many slots are currently live (have live data
1494 // Get the sequence number of the latest slot in the system
1495 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496 updateLiveStateFromServer();
1498 // No Need to remember after we pulled from the server
1499 offlineTransactionsCommittedAndAtServer->clear();
1501 // This is invalidated now
1502 hadPartialSendToServer = false;
1505 void Table::updateLiveStateFromServer() {
1506 // Process the new transaction parts
1507 processNewTransactionParts();
1509 // Do arbitration on new transactions that were received
1510 arbitrateFromServer();
1512 // Update all the committed keys
1513 bool didCommitOrSpeculate = updateCommittedTable();
1515 // Delete the transactions that are now dead
1516 updateLiveTransactionsAndStatus();
1519 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1523 void Table::updateLiveStateFromLocal() {
1524 // Update all the committed keys
1525 bool didCommitOrSpeculate = updateCommittedTable();
1527 // Delete the transactions that are now dead
1528 updateLiveTransactionsAndStatus();
1531 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536 int64_t prevslots = firstSequenceNumber;
1538 if (didFindTableStatus) {
1540 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1543 didFindTableStatus = true;
1544 currMaxSize = numberOfSlots;
1547 void Table::updateExpectedSize() {
1550 if (expectedsize > currMaxSize) {
1551 expectedsize = currMaxSize;
1557 * Check the size of the block chain to make sure there are enough
1558 * slots sent back by the server-> This is only called when we have a
1559 * gap between the slots that we have locally and the slots sent by
1560 * the server therefore in the slots sent by the server there will be
1561 * at least 1 Table status message
1563 void Table::checkNumSlots(int numberOfSlots) {
1564 if (numberOfSlots != expectedsize) {
1565 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1570 * Update the size of of the local buffer if it is needed->
1572 void Table::commitNewMaxSize() {
1573 didFindTableStatus = false;
1575 // Resize the local slot buffer
1576 if (numberOfSlots != currMaxSize) {
1577 buffer->resize((int32_t)currMaxSize);
1580 // Change the number of local slots to the new size
1581 numberOfSlots = (int32_t)currMaxSize;
1583 // Recalculate the resize threshold since the size of the local
1584 // buffer has changed
1585 setResizeThreshold();
1589 * Process the new transaction parts from this latest round of slots
1590 * received from the server
1592 void Table::processNewTransactionParts() {
1594 if (newTransactionParts->size() == 0) {
1595 // Nothing new to process
1599 // Iterate through all the machine Ids that we received new parts
1601 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602 while(tpit->hasNext()) {
1603 int64_t machineId = tpit->next();
1604 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1606 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607 // Iterate through all the parts for that machine Id
1608 while(ptit->hasNext()) {
1609 Pair<int64_t, int32_t> * partId = ptit->next();
1610 TransactionPart *part = parts->get(partId);
1612 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615 // Set dead the transaction part
1621 // Get the transaction object for that sequence number
1622 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1624 if (transaction == NULL) {
1625 // This is a new transaction that we dont have so make a new one
1626 transaction = new Transaction();
1628 // Insert this new transaction into the live tables
1629 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1633 // Add that part to the transaction
1634 transaction->addPartDecode(part);
1639 // Clear all the new transaction parts in preparation for the next
1640 // time the server sends slots
1641 newTransactionParts->clear();
1644 void Table::arbitrateFromServer() {
1646 if (liveTransactionBySequenceNumberTable->size() == 0) {
1647 // Nothing to arbitrate on so move on
1651 // Get the transaction sequence numbers and sort from oldest to newest
1652 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1653 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1655 // Collection of key value pairs that are
1656 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1658 // The last transaction arbitrated on
1659 int64_t lastTransactionCommitted = -1;
1660 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1662 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1663 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1667 // Check if this machine arbitrates for this transaction if not
1668 // then we cant arbitrate this transaction
1669 if (transaction->getArbitrator() != localMachineId) {
1673 if (transactionSequenceNumber < lastSeqNumArbOn) {
1677 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1678 // We have seen this already locally so dont commit again
1683 if (!transaction->isComplete()) {
1684 // Will arbitrate in incorrect order if we continue so just break
1690 // update the largest transaction seen by arbitrator from server
1691 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1692 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1694 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1695 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1696 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1700 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1701 // Guard evaluated as true
1703 // Update the local changes so we can make the commit
1704 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1705 while (kvit->hasNext()) {
1706 KeyValue *kv = kvit->next();
1707 speculativeTableTmp->put(kv->getKey(), kv);
1711 // Update what the last transaction committed was for use in batch commit
1712 lastTransactionCommitted = transactionSequenceNumber;
1714 // Guard evaluated was false so create abort
1716 Abort *newAbort = new Abort(NULL,
1717 transaction->getClientLocalSequenceNumber(),
1718 transaction->getSequenceNumber(),
1719 transaction->getMachineId(),
1720 transaction->getArbitrator(),
1721 localArbitrationSequenceNumber);
1722 localArbitrationSequenceNumber++;
1723 generatedAborts->add(newAbort);
1725 // Insert the abort so we can process
1726 processEntry(newAbort);
1729 lastSeqNumArbOn = transactionSequenceNumber;
1732 Commit *newCommit = NULL;
1734 // If there is something to commit
1735 if (speculativeTableTmp->size() != 0) {
1736 // Create the commit and increment the commit sequence number
1737 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1738 localArbitrationSequenceNumber++;
1740 // Add all the new keys to the commit
1741 for (KeyValue *kv : speculativeTableTmp->values()) {
1742 newCommit->addKV(kv);
1745 // create the commit parts
1746 newCommit->createCommitParts();
1748 // Append all the commit parts to the end of the pending queue
1749 // waiting for sending to the server
1750 // Insert the commit so we can process it
1751 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1752 processEntry(commitPart);
1756 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1757 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1758 pendingSendArbitrationRounds->add(arbitrationRound);
1760 if (compactArbitrationData()) {
1761 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1762 if (newArbitrationRound->getCommit() != NULL) {
1763 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1764 processEntry(commitPart);
1771 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1773 // Check if this machine arbitrates for this transaction if not then
1774 // we cant arbitrate this transaction
1775 if (transaction->getArbitrator() != localMachineId) {
1776 return Pair<bool, bool>(false, false);
1779 if (!transaction->isComplete()) {
1780 // Will arbitrate in incorrect order if we continue so just break
1782 return Pair<bool, bool>(false, false);
1785 if (transaction->getMachineId() != localMachineId) {
1786 // dont do this check for local transactions
1787 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1788 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1789 // We've have already seen this from the server
1790 return Pair<bool, bool>(false, false);
1795 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1796 // Guard evaluated as true Create the commit and increment the
1797 // commit sequence number
1798 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1799 localArbitrationSequenceNumber++;
1801 // Update the local changes so we can make the commit
1802 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1803 while (kvit->hasNext()) {
1804 KeyValue *kv = kvit->next();
1805 newCommit->addKV(kv);
1809 // create the commit parts
1810 newCommit->createCommitParts();
1812 // Append all the commit parts to the end of the pending queue
1813 // waiting for sending to the server
1814 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1815 pendingSendArbitrationRounds->add(arbitrationRound);
1817 if (compactArbitrationData()) {
1818 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1819 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1820 processEntry(commitPart);
1823 // Insert the commit so we can process it
1824 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1825 processEntry(commitPart);
1829 if (transaction->getMachineId() == localMachineId) {
1830 TransactionStatus *status = transaction->getTransactionStatus();
1831 if (status != NULL) {
1832 status->setStatus(TransactionStatus_StatusCommitted);
1836 updateLiveStateFromLocal();
1837 return Pair<bool, bool>(true, true);
1839 if (transaction->getMachineId() == localMachineId) {
1840 // For locally created messages update the status
1841 // Guard evaluated was false so create abort
1842 TransactionStatus * status = transaction->getTransactionStatus();
1843 if (status != NULL) {
1844 status->setStatus(TransactionStatus_StatusAborted);
1847 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1850 Abort *newAbort = new Abort(NULL,
1851 transaction->getClientLocalSequenceNumber(),
1853 transaction->getMachineId(),
1854 transaction->getArbitrator(),
1855 localArbitrationSequenceNumber);
1856 localArbitrationSequenceNumber++;
1857 addAbortSet->add(newAbort);
1859 // Append all the commit parts to the end of the pending queue
1860 // waiting for sending to the server
1861 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1862 pendingSendArbitrationRounds->add(arbitrationRound);
1864 if (compactArbitrationData()) {
1865 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1866 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1867 processEntry(commitPart);
1872 updateLiveStateFromLocal();
1873 return Pair<bool, bool>(true, false);
1878 * Compacts the arbitration data my merging commits and aggregating
1879 * aborts so that a single large push of commits can be done instead
1880 * of many small updates
1882 bool Table::compactArbitrationData() {
1883 if (pendingSendArbitrationRounds->size() < 2) {
1884 // Nothing to compact so do nothing
1888 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1889 if (lastRound->getDidSendPart()) {
1893 bool hadCommit = (lastRound->getCommit() == NULL);
1894 bool gotNewCommit = false;
1896 int numberToDelete = 1;
1897 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1898 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1900 if (round->isFull() || round->getDidSendPart()) {
1901 // Stop since there is a part that cannot be compacted and we
1902 // need to compact in order
1906 if (round->getCommit() == NULL) {
1907 // Try compacting aborts only
1908 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1909 if (newSize > ArbitrationRound_MAX_PARTS) {
1910 // Cant compact since it would be too large
1913 lastRound->addAborts(round->getAborts());
1915 // Create a new larger commit
1916 Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1917 localArbitrationSequenceNumber++;
1919 // Create the commit parts so that we can count them
1920 newCommit->createCommitParts();
1922 // Calculate the new size of the parts
1923 int newSize = newCommit->getNumberOfParts();
1924 newSize += lastRound->getAbortsCount();
1925 newSize += round->getAbortsCount();
1927 if (newSize > ArbitrationRound_MAX_PARTS) {
1928 // Cant compact since it would be too large
1932 // Set the new compacted part
1933 lastRound->setCommit(newCommit);
1934 lastRound->addAborts(round->getAborts());
1935 gotNewCommit = true;
1941 if (numberToDelete != 1) {
1942 // If there is a compaction
1943 // Delete the previous pieces that are now in the new compacted piece
1944 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1945 pendingSendArbitrationRounds->clear();
1947 for (int i = 0; i < numberToDelete; i++) {
1948 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1952 // Add the new compacted into the pending to send list
1953 pendingSendArbitrationRounds->add(lastRound);
1955 // Should reinsert into the commit processor
1956 if (hadCommit && gotNewCommit) {
1965 * Update all the commits and the committed tables, sets dead the dead
1968 bool Table::updateCommittedTable() {
1970 if (newCommitParts->size() == 0) {
1971 // Nothing new to process
1975 // Iterate through all the machine Ids that we received new parts for
1976 for (int64_t machineId : newCommitParts->keySet()) {
1977 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1979 // Iterate through all the parts for that machine Id
1980 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1981 CommitPart *part = parts->get(partId);
1983 // Get the transaction object for that sequence number
1984 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1986 if (commitForClientTable == NULL) {
1987 // This is the first commit from this device
1988 commitForClientTable = new Hashtable<int64_t, Commit *>();
1989 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1992 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1994 if (commit == NULL) {
1995 // This is a new commit that we dont have so make a new one
1996 commit = new Commit();
1998 // Insert this new commit into the live tables
1999 commitForClientTable->put(part->getSequenceNumber(), commit);
2002 // Add that part to the commit
2003 commit->addPartDecode(part);
2007 // Clear all the new commits parts in preparation for the next time
2008 // the server sends slots
2009 newCommitParts->clear();
2011 // If we process a new commit keep track of it for future use
2012 bool didProcessANewCommit = false;
2014 // Process the commits one by one
2015 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
2017 // Get all the commits for a specific arbitrator
2018 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2020 // Sort the commits in order
2021 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
2022 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2024 // Get the last commit seen from this arbitrator
2025 int64_t lastCommitSeenSequenceNumber = -1;
2026 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
2027 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2030 // Go through each new commit one by one
2031 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2032 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2033 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2035 // Special processing if a commit is not complete
2036 if (!commit->isComplete()) {
2037 if (i == (commitSequenceNumbers->size() - 1)) {
2038 // If there is an incomplete commit and this commit is the
2039 // latest one seen then this commit cannot be processed and
2040 // there are no other commits
2043 // This is a commit that was already dead but parts of it
2044 // are still in the block chain (not flushed out yet)->
2045 // Delete it and move on
2047 commitForClientTable->remove(commit->getSequenceNumber());
2052 // Update the last transaction that was updated if we can
2053 if (commit->getTransactionSequenceNumber() != -1) {
2054 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2056 // Update the last transaction sequence number that the arbitrator arbitrated on
2057 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2058 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2062 // Update the last arbitration data that we have seen so far
2063 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2064 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2065 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2067 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2070 // Never seen any data from this arbitrator so record the first one
2071 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2074 // We have already seen this commit before so need to do the
2075 // full processing on this commit
2076 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2078 // Update the last transaction that was updated if we can
2079 if (commit->getTransactionSequenceNumber() != -1) {
2080 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2082 // Update the last transaction sequence number that the arbitrator arbitrated on
2083 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2084 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2091 // If we got here then this is a brand new commit and needs full
2093 // Get what commits should be edited, these are the commits that
2094 // have live values for their keys
2095 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2097 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2098 while (kvit->hasNext()) {
2099 KeyValue *kv = kvit->next();
2100 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2104 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2106 // Update each previous commit that needs to be updated
2107 for (Commit *previousCommit : commitsToEdit) {
2109 // Only bother with live commits (TODO: Maybe remove this check)
2110 if (previousCommit->isLive()) {
2112 // Update which keys in the old commits are still live
2114 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2115 while (kvit->hasNext()) {
2116 KeyValue *kv = kvit->next();
2117 previousCommit->invalidateKey(kv->getKey());
2122 // if the commit is now dead then remove it
2123 if (!previousCommit->isLive()) {
2124 commitForClientTable->remove(previousCommit);
2129 // Update the last seen sequence number from this arbitrator
2130 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2131 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2132 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2135 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2138 // We processed a new commit that we havent seen before
2139 didProcessANewCommit = true;
2141 // Update the committed table of keys and which commit is using which key
2143 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2144 while (kvit->hasNext()) {
2145 KeyValue *kv = kvit->next();
2146 committedKeyValueTable->put(kv->getKey(), kv);
2147 liveCommitsByKeyTable->put(kv->getKey(), commit);
2154 return didProcessANewCommit;
2158 * Create the speculative table from transactions that are still live
2159 * and have come from the cloud
2161 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2162 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2163 // There is nothing to speculate on
2167 // Create a list of the transaction sequence numbers and sort them
2168 // from oldest to newest
2169 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2170 qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2172 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2175 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2176 // If there is a gap in the transaction sequence numbers then
2177 // there was a commit or an abort of a transaction OR there was a
2178 // new commit (Could be from offline commit) so a redo the
2179 // speculation from scratch
2181 // Start from scratch
2182 speculatedKeyValueTable->clear();
2183 lastTransactionSequenceNumberSpeculatedOn = -1;
2184 oldestTransactionSequenceNumberSpeculatedOn = -1;
2188 // Remember the front of the transaction list
2189 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2191 // Find where to start arbitration from
2192 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2194 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2195 // Make sure we are not out of bounds
2196 return false; // did not speculate
2199 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2200 bool didSkip = true;
2202 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2203 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2204 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2206 if (!transaction->isComplete()) {
2207 // If there is an incomplete transaction then there is nothing
2208 // we can do add this transactions arbitrator to the list of
2209 // arbitrators we should ignore
2210 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2215 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2219 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2221 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2222 // Guard evaluated to true so update the speculative table
2224 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2225 while (kvit->hasNext()) {
2226 KeyValue *kv = kvit->next();
2227 speculatedKeyValueTable->put(kv->getKey(), kv);
2235 // Since there was a skip we need to redo the speculation next time around
2236 lastTransactionSequenceNumberSpeculatedOn = -1;
2237 oldestTransactionSequenceNumberSpeculatedOn = -1;
2240 // We did some speculation
2245 * Create the pending transaction speculative table from transactions
2246 * that are still in the pending transaction buffer
2248 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2249 if (pendingTransactionQueue->size() == 0) {
2250 // There is nothing to speculate on
2254 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2255 // need to reset on the pending speculation
2256 lastPendingTransactionSpeculatedOn = NULL;
2257 firstPendingTransaction = pendingTransactionQueue->get(0);
2258 pendingTransactionSpeculatedKeyValueTable->clear();
2261 // Find where to start arbitration from
2262 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2264 if (startIndex >= pendingTransactionQueue->size()) {
2265 // Make sure we are not out of bounds
2269 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2270 Transaction *transaction = pendingTransactionQueue->get(i);
2272 lastPendingTransactionSpeculatedOn = transaction;
2274 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2275 // Guard evaluated to true so update the speculative table
2276 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2277 while (kvit->hasNext()) {
2278 KeyValue *kv = kvit->next();
2279 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2287 * Set dead and remove from the live transaction tables the
2288 * transactions that are dead
2290 void Table::updateLiveTransactionsAndStatus() {
2292 // Go through each of the transactions
2293 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2294 Transaction *transaction = iter->next()->getValue();
2296 // Check if the transaction is dead
2297 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2298 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2300 // Set dead the transaction
2301 transaction->setDead();
2303 // Remove the transaction from the live table
2305 liveTransactionByTransactionIdTable->remove(transaction->getId());
2309 // Go through each of the transactions
2310 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2311 TransactionStatus *status = iter->next()->getValue();
2313 // Check if the transaction is dead
2314 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2315 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2318 status->setStatus(TransactionStatus_StatusCommitted);
2327 * Process this slot, entry by entry-> Also update the latest message sent by slot
2329 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2331 // Update the last message seen
2332 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2334 // Process each entry in the slot
2335 Vector<Entry *> *entries = slot->getEntries();
2336 uint eSize = entries->size();
2337 for(uint ei=0; ei < eSize; ei++) {
2338 Entry * entry = entries->get(ei);
2339 switch (entry->getType()) {
2340 case TypeCommitPart:
2341 processEntry((CommitPart *)entry);
2344 processEntry((Abort *)entry);
2346 case TypeTransactionPart:
2347 processEntry((TransactionPart *)entry);
2350 processEntry((NewKey *)entry);
2352 case TypeLastMessage:
2353 processEntry((LastMessage *)entry, machineSet);
2355 case TypeRejectedMessage:
2356 processEntry((RejectedMessage *)entry, indexer);
2358 case TypeTableStatus:
2359 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2362 throw new Error("Unrecognized type: ");
2368 * Update the last message that was sent for a machine Id
2370 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2371 // Update what the last message received by a machine was
2372 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2376 * Add the new key to the arbitrators table and update the set of live
2377 * new keys (in case of a rescued new key message)
2379 void Table::processEntry(NewKey *entry) {
2380 // Update the arbitrator table with the new key information
2381 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2383 // Update what the latest live new key is
2384 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2385 if (oldNewKey != NULL) {
2386 // Delete the old new key messages
2387 oldNewKey->setDead();
2392 * Process new table status entries and set dead the old ones as new
2393 * ones come in-> keeps track of the largest and smallest table status
2394 * seen in this current round of updating the local copy of the block
2397 void Table::processEntry(TableStatus * entry, int64_t seq) {
2398 int newNumSlots = entry->getMaxSlots();
2399 updateCurrMaxSize(newNumSlots);
2400 initExpectedSize(seq, newNumSlots);
2402 if (liveTableStatus != NULL) {
2403 // We have a larger table status so the old table status is no
2405 liveTableStatus->setDead();
2408 // Make this new table status the latest alive table status
2409 liveTableStatus = entry;
2413 * Check old messages to see if there is a block chain violation->
2416 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2417 int64_t oldSeqNum = entry->getOldSeqNum();
2418 int64_t newSeqNum = entry->getNewSeqNum();
2419 bool isequal = entry->getEqual();
2420 int64_t machineId = entry->getMachineID();
2421 int64_t seq = entry->getSequenceNumber();
2423 // Check if we have messages that were supposed to be rejected in
2424 // our local block chain
2425 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2427 Slot *slot = indexer->getSlot(seqNum);
2430 // If we have this slot make sure that it was not supposed to be
2432 int64_t slotMachineId = slot->getMachineID();
2433 if (isequal != (slotMachineId == machineId)) {
2434 throw new Error("Server Error: Trying to insert rejected message for slot ");
2439 // Create a list of clients to watch until they see this rejected
2441 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2442 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2443 // Machine ID for the last message entry
2444 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2446 // We've seen it, don't need to continue to watch-> Our next
2447 // message will implicitly acknowledge it->
2448 if (lastMessageEntryMachineId == localMachineId) {
2452 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2453 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2455 if (entrySequenceNumber < seq) {
2456 // Add this rejected message to the set of messages that this
2457 // machine ID did not see yet
2458 addWatchVector(lastMessageEntryMachineId, entry);
2459 // This client did not see this rejected message yet so add it
2460 // to the watch set to monitor
2461 deviceWatchSet->add(lastMessageEntryMachineId);
2464 if (deviceWatchSet->isEmpty()) {
2465 // This rejected message has been seen by all the clients so
2468 // We need to watch this rejected message
2469 entry->setWatchSet(deviceWatchSet);
2474 * Check if this abort is live, if not then save it so we can kill it
2475 * later-> update the last transaction number that was arbitrated on->
2477 void Table::processEntry(Abort *entry) {
2478 if (entry->getTransactionSequenceNumber() != -1) {
2479 // update the transaction status if it was sent to the server
2480 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2481 if (status != NULL) {
2482 status->setStatus(TransactionStatus_StatusAborted);
2486 // Abort has not been seen by the client it is for yet so we need to
2488 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2489 if (previouslySeenAbort != NULL) {
2490 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2493 if (entry->getTransactionArbitrator() == localMachineId) {
2494 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2497 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2498 // The machine already saw this so it is dead
2500 liveAbortTable->remove(&entry->getAbortId());
2502 if (entry->getTransactionArbitrator() == localMachineId) {
2503 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2508 // Update the last arbitration data that we have seen so far
2509 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2510 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2511 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2513 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2516 // Never seen any data from this arbitrator so record the first one
2517 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2520 // Set dead a transaction if we can
2521 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2522 if (transactionToSetDead != NULL) {
2523 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2526 // Update the last transaction sequence number that the arbitrator
2528 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2529 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2531 if (entry->getTransactionSequenceNumber() != -1) {
2532 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2538 * Set dead the transaction part if that transaction is dead and keep
2539 * track of all new parts
2541 void Table::processEntry(TransactionPart *entry) {
2542 // Check if we have already seen this transaction and set it dead OR
2543 // if it is not alive
2544 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2545 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2546 // This transaction is dead, it was already committed or aborted
2551 // This part is still alive
2552 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2554 if (transactionPart == NULL) {
2555 // Dont have a table for this machine Id yet so make one
2556 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2557 newTransactionParts->put(entry->getMachineId(), transactionPart);
2560 // Update the part and set dead ones we have already seen (got a
2562 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2563 if (previouslySeenPart != NULL) {
2564 previouslySeenPart->setDead();
2569 * Process new commit entries and save them for future use-> Delete duplicates
2571 void Table::processEntry(CommitPart *entry) {
2572 // Update the last transaction that was updated if we can
2573 if (entry->getTransactionSequenceNumber() != -1) {
2574 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2575 // Update the last transaction sequence number that the arbitrator
2577 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2578 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2582 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2583 if (commitPart == NULL) {
2584 // Don't have a table for this machine Id yet so make one
2585 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2586 newCommitParts->put(entry->getMachineId(), commitPart);
2588 // Update the part and set dead ones we have already seen (got a
2590 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2591 if (previouslySeenPart != NULL) {
2592 previouslySeenPart->setDead();
2597 * Update the last message seen table-> Update and set dead the
2598 * appropriate RejectedMessages as clients see them-> Updates the live
2599 * aborts, removes those that are dead and sets them dead-> Check that
2600 * the last message seen is correct and that there is no mismatch of
2601 * our own last message or that other clients have not had a rollback
2602 * on the last message->
2604 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2605 // We have seen this machine ID
2606 machineSet->remove(machineId);
2608 // Get the set of rejected messages that this machine Id is has not seen yet
2609 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2610 // If there is a rejected message that this machine Id has not seen yet
2611 if (watchset != NULL) {
2612 // Go through each rejected message that this machine Id has not
2615 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2616 while(rmit->hasNext()) {
2617 RejectedMessage *rm = rmit->next();
2618 // If this machine Id has seen this rejected message->->->
2619 if (rm->getSequenceNumber() <= seqNum) {
2620 // Remove it from our watchlist
2622 // Decrement machines that need to see this notification
2623 rm->removeWatcher(machineId);
2629 // Set dead the abort
2630 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2631 Abort *abort = i->next()->getValue();
2632 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2635 if (abort->getTransactionArbitrator() == localMachineId) {
2636 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2640 if (machineId == localMachineId) {
2641 // Our own messages are immediately dead->
2642 char livenessType = liveness->getType();
2643 if (livenessType==TypeLastMessage) {
2644 ((LastMessage *)liveness)->setDead();
2645 } else if (livenessType == TypeSlot) {
2646 ((Slot *)liveness)->setDead();
2648 throw new Error("Unrecognized type");
2651 // Get the old last message for this device
2652 Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2653 if (lastMessageEntry == NULL) {
2654 // If no last message then there is nothing else to process
2658 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2659 Liveness *lastEntry = lastMessageEntry->getSecond();
2660 delete lastMessageEntry;
2662 // If it is not our machine Id since we already set ours to dead
2663 if (machineId != localMachineId) {
2664 char lastEntryType = lastEntry->getType();
2666 if (lastEntryType == TypeLastMessage) {
2667 ((LastMessage *)lastEntry)->setDead();
2668 } else if (lastEntryType == TypeSlot) {
2669 ((Slot *)lastEntry)->setDead();
2671 throw new Error("Unrecognized type");
2674 // Make sure the server is not playing any games
2675 if (machineId == localMachineId) {
2676 if (hadPartialSendToServer) {
2677 // We were not making any updates and we had a machine mismatch
2678 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2679 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2682 // We were not making any updates and we had a machine mismatch
2683 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2684 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2688 if (lastMessageSeqNum > seqNum) {
2689 throw new Error("Server Error: Rollback on remote machine sequence number");
2695 * Add a rejected message entry to the watch set to keep track of
2696 * which clients have seen that rejected message entry and which have
2699 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2700 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2701 if (entries == NULL) {
2702 // There is no set for this machine ID yet so create one
2703 entries = new Hashset<RejectedMessage *>();
2704 rejectedMessageWatchVectorTable->put(machineId, entries);
2706 entries->add(entry);
2710 * Check if the HMAC chain is not violated
2712 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2713 for (int i = 0; i < newSlots->length(); i++) {
2714 Slot *currSlot = newSlots->get(i);
2715 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2716 if (prevSlot != NULL &&
2717 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2718 throw new Error("Server Error: Invalid HMAC Chain");