3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void * a, const void *b) {
25 const int64_t * pa = (const int64_t *) a;
26 const int64_t * pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localTransactionSequenceNumber(0),
50 lastTransactionSequenceNumberSpeculatedOn(0),
51 oldestTransactionSequenceNumberSpeculatedOn(0),
52 localArbitrationSequenceNumber(0),
53 hadPartialSendToServer(false),
54 attemptedToSendToServer(false),
56 didFindTableStatus(false),
58 lastSlotAttemptedToSend(NULL),
61 lastTransactionPartsSent(NULL),
62 lastPendingSendArbitrationEntriesToDelete(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localTransactionSequenceNumber(0),
112 lastTransactionSequenceNumberSpeculatedOn(0),
113 oldestTransactionSequenceNumberSpeculatedOn(0),
114 localArbitrationSequenceNumber(0),
115 hadPartialSendToServer(false),
116 attemptedToSendToServer(false),
118 didFindTableStatus(false),
120 lastSlotAttemptedToSend(NULL),
123 lastTransactionPartsSent(NULL),
124 lastPendingSendArbitrationEntriesToDelete(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
160 * Init all the stuff needed for for table usage
163 // Init helper objects
164 random = new Random();
165 buffer = new SlotBuffer();
168 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174 arbitratorTable = new Hashtable<IoTString *, int64_t>();
175 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184 rejectedSlotVector = new Vector<int64_t>();
185 pendingTransactionQueue = new Vector<Transaction *>();
186 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
197 numberOfSlots = buffer->capacity();
198 setResizeThreshold();
202 * Initialize the table by inserting a table status as the first entry
203 * into the table status also initialize the crypto stuff.
205 void Table::initTable() {
206 cloud->initSecurity();
208 // Create the first insertion into the block chain which is the table status
209 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210 localSequenceNumber++;
211 TableStatus *status = new TableStatus(s, numberOfSlots);
213 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
216 array = new Array<Slot *>(1);
218 // update local block chain
219 validateAndUpdate(array, true);
220 } else if (array->length() == 1) {
221 // in case we did push the slot BUT we failed to init it
222 validateAndUpdate(array, true);
224 throw new Error("Error on initialization");
229 * Rebuild the table from scratch by pulling the latest block chain
232 void Table::rebuild() {
233 // Just pull the latest slots from the server
234 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235 validateAndUpdate(newslots, true);
237 updateLiveTransactionsAndStatus();
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
244 int64_t Table::getArbitrator(IoTString *key) {
245 return arbitratorTable->get(key);
248 void Table::close() {
252 IoTString *Table::getCommitted(IoTString *key) {
253 KeyValue *kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getSpeculative(IoTString *key) {
263 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
266 kv = speculatedKeyValueTable->get(key);
270 kv = committedKeyValueTable->get(key);
274 return kv->getValue();
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281 KeyValue *kv = committedKeyValueTable->get(key);
283 if (!arbitratorTable->contains(key)) {
284 throw new Error("Key not Found.");
287 // Make sure new key value pair matches the current arbitrator
288 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289 // TODO: Maybe not throw en error
290 throw new Error("Not all Key Values Match Arbitrator.");
294 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295 return kv->getValue();
297 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303 if (!arbitratorTable->contains(key)) {
304 throw new Error("Key not Found.");
307 // Make sure new key value pair matches the current arbitrator
308 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309 // TODO: Maybe not throw en error
310 throw new Error("Not all Key Values Match Arbitrator.");
313 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
316 kv = speculatedKeyValueTable->get(key);
320 kv = committedKeyValueTable->get(key);
324 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325 return kv->getValue();
327 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
332 bool Table::update() {
334 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335 validateAndUpdate(newSlots, false);
337 updateLiveTransactionsAndStatus();
339 } catch (Exception *e) {
340 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341 while (kit->hasNext()) {
342 int64_t m = kit->next();
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
353 if (!arbitratorTable->contains(keyName)) {
354 // There is already an arbitrator
357 NewKey *newKey = new NewKey(NULL, keyName, machineId);
359 if (sendToServer(newKey)) {
360 // If successfully inserted
366 void Table::startTransaction() {
367 // Create a new transaction, invalidates any old pending transactions.
368 pendingTransactionBuilder = new PendingTransaction(localMachineId);
371 void Table::addKV(IoTString *key, IoTString *value) {
373 // Make sure it is a valid key
374 if (!arbitratorTable->contains(key)) {
375 throw new Error("Key not Found.");
378 // Make sure new key value pair matches the current arbitrator
379 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380 // TODO: Maybe not throw en error
381 throw new Error("Not all Key Values Match Arbitrator.");
384 // Add the key value to this transaction
385 KeyValue *kv = new KeyValue(key, value);
386 pendingTransactionBuilder->addKV(kv);
389 TransactionStatus *Table::commitTransaction() {
390 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391 // transaction with no updates will have no effect on the system
392 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
395 // Set the local transaction sequence number and increment
396 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397 localTransactionSequenceNumber++;
399 // Create the transaction status
400 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
402 // Create the new transaction
403 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404 newTransaction->setTransactionStatus(transactionStatus);
406 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407 // Add it to the queue and invalidate the builder for safety
408 pendingTransactionQueue->add(newTransaction);
410 arbitrateOnLocalTransaction(newTransaction);
411 updateLiveStateFromLocal();
414 pendingTransactionBuilder = new PendingTransaction(localMachineId);
418 } catch (ServerException *e) {
420 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421 uint size = pendingTransactionQueue->size();
423 for (int iter = 0; iter < size; iter++) {
424 Transaction *transaction = pendingTransactionQueue->get(iter);
425 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
427 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428 // Already contacted this client so ignore all attempts to contact this client
429 // to preserve ordering for arbitrator
433 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
435 if (sendReturn.getFirst()) {
436 // Failed to contact over local
437 arbitratorTriedAndFailed->add(transaction->getArbitrator());
439 // Successful contact or should not contact
441 if (sendReturn.getSecond()) {
447 pendingTransactionQueue->setSize(oldindex);
450 updateLiveStateFromLocal();
452 return transactionStatus;
456 * Recalculate the new resize threshold
458 void Table::setResizeThreshold() {
459 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
463 int64_t Table::getLocalSequenceNumber() {
464 return localSequenceNumber;
467 bool Table::sendToServer(NewKey *newKey) {
468 bool fromRetry = false;
470 if (hadPartialSendToServer) {
471 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472 if (newSlots->length() == 0) {
474 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
476 if (sendSlotsReturn.getFirst()) {
477 if (newKey != NULL) {
478 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
483 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484 while (trit->hasNext()) {
485 Transaction *transaction = trit->next();
486 transaction->resetServerFailure();
487 // Update which transactions parts still need to be sent
488 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489 // Add the transaction status to the outstanding list
490 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
492 // Update the transaction status
493 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
495 // Check if all the transaction parts were successfully
496 // sent and if so then remove it from pending
497 if (transaction->didSendAllParts()) {
498 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499 pendingTransactionQueue->remove(transaction);
504 newSlots = sendSlotsReturn.getThird();
505 bool isInserted = false;
506 for (uint si = 0; si < newSlots->length(); si++) {
507 Slot *s = newSlots->get(si);
508 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
514 for (uint si = 0; si < newSlots->length(); si++) {
515 Slot *s = newSlots->get(si);
520 // Process each entry in the slot
521 Vector<Entry *> *ventries = s->getEntries();
522 uint vesize = ventries->size();
523 for (uint vei = 0; vei < vesize; vei++) {
524 Entry *entry = ventries->get(vei);
525 if (entry->getType() == TypeLastMessage) {
526 LastMessage *lastMessage = (LastMessage *)entry;
527 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
536 if (newKey != NULL) {
537 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
542 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543 while (trit->hasNext()) {
544 Transaction *transaction = trit->next();
545 transaction->resetServerFailure();
547 // Update which transactions parts still need to be sent
548 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
550 // Add the transaction status to the outstanding list
551 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
553 // Update the transaction status
554 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
556 // Check if all the transaction parts were successfully sent and if so then remove it from pending
557 if (transaction->didSendAllParts()) {
558 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559 pendingTransactionQueue->remove(transaction);
561 transaction->resetServerFailure();
562 // Set the transaction sequence number back to nothing
563 if (!transaction->didSendAPartToServer()) {
564 transaction->setSequenceNumber(-1);
572 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573 while (trit->hasNext()) {
574 Transaction *transaction = trit->next();
575 transaction->resetServerFailure();
576 // Set the transaction sequence number back to nothing
577 if (!transaction->didSendAPartToServer()) {
578 transaction->setSequenceNumber(-1);
583 if (sendSlotsReturn.getThird()->length() != 0) {
584 // insert into the local block chain
585 validateAndUpdate(sendSlotsReturn.getThird(), true);
589 bool isInserted = false;
590 for (uint si = 0; si < newSlots->length(); si++) {
591 Slot *s = newSlots->get(si);
592 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
598 for (uint si = 0; si < newSlots->length(); si++) {
599 Slot *s = newSlots->get(si);
604 // Process each entry in the slot
605 Vector<Entry *> *entries = s->getEntries();
606 uint eSize = entries->size();
607 for(uint ei=0; ei < eSize; ei++) {
608 Entry * entry = entries->get(ei);
610 if (entry->getType() == TypeLastMessage) {
611 LastMessage *lastMessage = (LastMessage *)entry;
612 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
621 if (newKey != NULL) {
622 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
627 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628 while (trit->hasNext()) {
629 Transaction *transaction = trit->next();
630 transaction->resetServerFailure();
632 // Update which transactions parts still need to be sent
633 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
635 // Add the transaction status to the outstanding list
636 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
638 // Update the transaction status
639 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
641 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642 if (transaction->didSendAllParts()) {
643 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644 pendingTransactionQueue->remove(transaction);
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656 while (trit->hasNext()) {
657 Transaction *transaction = trit->next();
658 transaction->resetServerFailure();
659 // Set the transaction sequence number back to nothing
660 if (!transaction->didSendAPartToServer()) {
661 transaction->setSequenceNumber(-1);
667 // insert into the local block chain
668 validateAndUpdate(newSlots, true);
671 } catch (ServerException *e) {
678 // While we have stuff that needs inserting into the block chain
679 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
683 if (hadPartialSendToServer) {
684 throw new Error("Should Be error free");
689 // If there is a new key with same name then end
690 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
695 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696 localSequenceNumber++;
698 // Try to fill the slot with data
699 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700 bool needsResize = fillSlotsReturn.getFirst();
701 int newSize = fillSlotsReturn.getSecond();
702 bool insertedNewKey = fillSlotsReturn.getThird();
705 // Reset which transaction to send
706 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707 while (trit->hasNext()) {
708 Transaction *transaction = trit->next();
709 transaction->resetNextPartToSend();
711 // Set the transaction sequence number back to nothing
712 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713 transaction->setSequenceNumber(-1);
718 // Clear the sent data since we are trying again
719 pendingSendArbitrationEntriesToDelete->clear();
720 transactionPartsSent->clear();
722 // We needed a resize so try again
723 fillSlot(slot, true, newKey);
726 lastSlotAttemptedToSend = slot;
727 lastIsNewKey = (newKey != NULL);
728 lastInsertedNewKey = insertedNewKey;
729 lastNewSize = newSize;
731 lastTransactionPartsSent = transactionPartsSent->clone();
732 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
734 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
736 if (sendSlotsReturn.getFirst()) {
738 // Did insert into the block chain
740 if (insertedNewKey) {
741 // This slot was what was inserted not a previous slot
743 // New Key was successfully inserted into the block chain so dont want to insert it again
747 // Remove the aborts and commit parts that were sent from the pending to send queue
748 uint size = pendingSendArbitrationRounds->size();
750 for (uint i = 0; i < size; i++) {
751 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752 round->removeParts(pendingSendArbitrationEntriesToDelete);
754 if (!round->isDoneSending()) {
755 // Sent all the parts
756 pendingSendArbitrationRounds->set(oldcount++,
757 pendingSendArbitrationRounds->get(i));
760 pendingSendArbitrationRounds->setSize(oldcount);
762 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763 while (trit->hasNext()) {
764 Transaction *transaction = trit->next();
765 transaction->resetServerFailure();
767 // Update which transactions parts still need to be sent
768 transaction->removeSentParts(transactionPartsSent->get(transaction));
770 // Add the transaction status to the outstanding list
771 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
773 // Update the transaction status
774 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
776 // Check if all the transaction parts were successfully sent and if so then remove it from pending
777 if (transaction->didSendAllParts()) {
778 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779 pendingTransactionQueue->remove(transaction);
784 // Reset which transaction to send
785 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786 while (trit->hasNext()) {
787 Transaction *transaction = trit->next();
788 transaction->resetNextPartToSend();
790 // Set the transaction sequence number back to nothing
791 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792 transaction->setSequenceNumber(-1);
798 // Clear the sent data in preparation for next send
799 pendingSendArbitrationEntriesToDelete->clear();
800 transactionPartsSent->clear();
802 if (sendSlotsReturn.getThird()->length() != 0) {
803 // insert into the local block chain
804 validateAndUpdate(sendSlotsReturn.getThird(), true);
808 } catch (ServerException *e) {
809 if (e->getType() != ServerException_TypeInputTimeout) {
810 // Nothing was able to be sent to the server so just clear these data structures
811 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812 while (trit->hasNext()) {
813 Transaction *transaction = trit->next();
814 transaction->resetNextPartToSend();
816 // Set the transaction sequence number back to nothing
817 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818 transaction->setSequenceNumber(-1);
823 // There was a partial send to the server
824 hadPartialSendToServer = true;
826 // Nothing was able to be sent to the server so just clear these data structures
827 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828 while (trit->hasNext()) {
829 Transaction *transaction = trit->next();
830 transaction->resetNextPartToSend();
831 transaction->setServerFailure();
836 pendingSendArbitrationEntriesToDelete->clear();
837 transactionPartsSent->clear();
842 return newKey == NULL;
845 bool Table::updateFromLocal(int64_t machineId) {
846 if (!localCommunicationTable->contains(machineId))
849 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
851 // Get the size of the send data
852 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
859 Array<char> *sendData = new Array<char>(sendDataSize);
860 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
863 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
867 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868 localSequenceNumber++;
870 if (returnData == NULL) {
871 // Could not contact server
876 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877 int numberOfEntries = bbDecode->getInt();
879 for (int i = 0; i < numberOfEntries; i++) {
880 char type = bbDecode->get();
881 if (type == TypeAbort) {
882 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
884 } else if (type == TypeCommitPart) {
885 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886 processEntry(commitPart);
890 updateLiveStateFromLocal();
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
897 // Get the devices local communications
898 if (!localCommunicationTable->contains(transaction->getArbitrator()))
899 return Pair<bool, bool>(true, false);
901 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
903 // Get the size of the send data
904 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
906 Vector<TransactionPart *> * tParts = transaction->getParts();
907 uint tPartsSize = tParts->size();
908 for (uint i = 0; i < tPartsSize; i++) {
909 TransactionPart * part = tParts->get(i);
910 sendDataSize += part->getSize();
914 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
919 // Make the send data size
920 Array<char> *sendData = new Array<char>(sendDataSize);
921 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
924 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925 bbEncode->putInt(transaction->getParts()->size());
927 Vector<TransactionPart *> * tParts = transaction->getParts();
928 uint tPartsSize = tParts->size();
929 for (uint i = 0; i < tPartsSize; i++) {
930 TransactionPart * part = tParts->get(i);
931 part->encode(bbEncode);
936 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937 localSequenceNumber++;
939 if (returnData == NULL) {
940 // Could not contact server
941 return Pair<bool, bool>(true, false);
945 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946 bool didCommit = bbDecode->get() == 1;
947 bool couldArbitrate = bbDecode->get() == 1;
948 int numberOfEntries = bbDecode->getInt();
949 bool foundAbort = false;
951 for (int i = 0; i < numberOfEntries; i++) {
952 char type = bbDecode->get();
953 if (type == TypeAbort) {
954 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
956 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
961 } else if (type == TypeCommitPart) {
962 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963 processEntry(commitPart);
967 updateLiveStateFromLocal();
969 if (couldArbitrate) {
970 TransactionStatus * status = transaction->getTransactionStatus();
972 status->setStatus(TransactionStatus_StatusCommitted);
974 status->setStatus(TransactionStatus_StatusAborted);
977 TransactionStatus * status = transaction->getTransactionStatus();
979 status->setStatus(TransactionStatus_StatusAborted);
981 status->setStatus(TransactionStatus_StatusCommitted);
985 return Pair<bool, bool>(false, true);
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
990 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992 int numberOfParts = bbDecode->getInt();
994 // If we did commit a transaction or not
995 bool didCommit = false;
996 bool couldArbitrate = false;
998 if (numberOfParts != 0) {
1000 // decode the transaction
1001 Transaction *transaction = new Transaction();
1002 for (int i = 0; i < numberOfParts; i++) {
1004 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005 transaction->addPartDecode(newPart);
1008 // Arbitrate on transaction and pull relevant return data
1009 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010 couldArbitrate = localArbitrateReturn.getFirst();
1011 didCommit = localArbitrateReturn.getSecond();
1013 updateLiveStateFromLocal();
1015 // Transaction was sent to the server so keep track of it to prevent double commit
1016 if (transaction->getSequenceNumber() != -1) {
1017 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1021 // The data to send back
1022 int returnDataSize = 0;
1023 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1025 // Get the aborts to send back
1026 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1028 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029 while(abortit->hasNext())
1030 abortLocalSequenceNumbers->add(abortit->next());
1034 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1036 uint asize = abortLocalSequenceNumbers->size();
1037 for(uint i=0; i<asize; i++) {
1038 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1043 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044 unseenArbitrations->add(abort);
1045 returnDataSize += abort->getSize();
1048 // Get the commits to send back
1049 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050 if (commitForClientTable != NULL) {
1051 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1053 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054 while(commitit->hasNext())
1055 commitLocalSequenceNumbers->add(commitit->next());
1058 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1060 uint clsSize = commitLocalSequenceNumbers->size();
1061 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063 Commit *commit = commitForClientTable->get(localSequenceNumber);
1065 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1070 Vector<CommitPart *> * parts = commit->getParts();
1071 uint nParts = parts->size();
1072 for(uint i=0; i<nParts; i++) {
1073 CommitPart * commitPart = parts->get(i);
1074 unseenArbitrations->add(commitPart);
1075 returnDataSize += commitPart->getSize();
1081 // Number of arbitration entries to decode
1082 returnDataSize += 2 * sizeof(int32_t);
1084 // bool of did commit or not
1085 if (numberOfParts != 0) {
1086 returnDataSize += sizeof(char);
1089 // Data to send Back
1090 Array<char> *returnData = new Array<char>(returnDataSize);
1091 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1093 if (numberOfParts != 0) {
1095 bbEncode->put((char)1);
1097 bbEncode->put((char)0);
1099 if (couldArbitrate) {
1100 bbEncode->put((char)1);
1102 bbEncode->put((char)0);
1106 bbEncode->putInt(unseenArbitrations->size());
1107 uint size = unseenArbitrations->size();
1108 for (uint i = 0; i < size; i++) {
1109 Entry *entry = unseenArbitrations->get(i);
1110 entry->encode(bbEncode);
1113 localSequenceNumber++;
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119 attemptedToSendToServer = true;
1121 bool inserted = false;
1122 bool lastTryInserted = false;
1124 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125 if (array == NULL) {
1126 array = new Array<Slot *>();
1127 array->set(0, slot);
1128 rejectedSlotVector->clear();
1131 if (array->length() == 0) {
1132 throw new Error("Server Error: Did not send any slots");
1135 // if (attemptedToSendToServerTmp) {
1136 if (hadPartialSendToServer) {
1138 bool isInserted = false;
1139 uint size = array->length();
1140 for (uint i = 0; i < size; i++) {
1141 Slot *s = array->get(i);
1142 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1148 for (uint i = 0; i < size; i++) {
1149 Slot *s = array->get(i);
1154 // Process each entry in the slot
1155 Vector<Entry *> *entries = s->getEntries();
1156 uint eSize = entries->size();
1157 for(uint ei=0; ei < eSize; ei++) {
1158 Entry * entry = entries->get(ei);
1160 if (entry->getType() == TypeLastMessage) {
1161 LastMessage *lastMessage = (LastMessage *)entry;
1163 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1172 rejectedSlotVector->add(slot->getSequenceNumber());
1173 lastTryInserted = false;
1175 lastTryInserted = true;
1178 rejectedSlotVector->add(slot->getSequenceNumber());
1179 lastTryInserted = false;
1183 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1187 * Returns false if a resize was needed
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1191 if (liveSlotCount > bufferResizeThreshold) {
1192 resize = true;//Resize is forced
1196 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197 TableStatus *status = new TableStatus(slot, newSize);
1198 slot->addEntry(status);
1201 // Fill with rejected slots first before doing anything else
1202 doRejectedMessages(slot);
1204 // Do mandatory rescue of entries
1205 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1207 // Extract working variables
1208 bool needsResize = mandatoryRescueReturn.getFirst();
1209 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1212 if (needsResize && !resize) {
1213 // We need to resize but we are not resizing so return false
1214 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1217 bool inserted = false;
1218 if (newKeyEntry != NULL) {
1219 newKeyEntry->setSlot(slot);
1220 if (slot->hasSpace(newKeyEntry)) {
1221 slot->addEntry(newKeyEntry);
1226 // Clear the transactions, aborts and commits that were sent previously
1227 transactionPartsSent->clear();
1228 pendingSendArbitrationEntriesToDelete->clear();
1229 uint size = pendingSendArbitrationRounds->size();
1230 for (uint i = 0; i < size; i++) {
1231 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232 bool isFull = false;
1233 round->generateParts();
1234 Vector<Entry *> *parts = round->getParts();
1236 // Insert pending arbitration data
1237 uint vsize = parts->size();
1238 for (uint vi = 0; vi < vsize; vi++) {
1239 Entry *arbitrationData = parts->get(vi);
1241 // If it is an abort then we need to set some information
1242 if (arbitrationData->getType() == TypeAbort) {
1243 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1246 if (!slot->hasSpace(arbitrationData)) {
1247 // No space so cant do anything else with these data entries
1252 // Add to this current slot and add it to entries to delete
1253 slot->addEntry(arbitrationData);
1254 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1262 if (pendingTransactionQueue->size() > 0) {
1263 Transaction *transaction = pendingTransactionQueue->get(0);
1264 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266 transaction->setSequenceNumber(slot->getSequenceNumber());
1270 TransactionPart *part = transaction->getNextPartToSend();
1272 // Ran out of parts to send for this transaction so move on
1276 if (slot->hasSpace(part)) {
1277 slot->addEntry(part);
1278 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279 if (partsSent == NULL) {
1280 partsSent = new Vector<int32_t>();
1281 transactionPartsSent->put(transaction, partsSent);
1283 partsSent->add(part->getPartNumber());
1284 transactionPartsSent->put(transaction, partsSent);
1291 // Fill the remainder of the slot with rescue data
1292 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1294 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1297 void Table::doRejectedMessages(Slot *s) {
1298 if (!rejectedSlotVector->isEmpty()) {
1299 /* TODO: We should avoid generating a rejected message entry if
1300 * there is already a sufficient entry in the queue (e->g->,
1301 * equalsto value of true and same sequence number)-> */
1303 int64_t old_seqn = rejectedSlotVector->get(0);
1304 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305 int64_t new_seqn = rejectedSlotVector->lastElement();
1306 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1309 int64_t prev_seqn = -1;
1311 /* Go through list of missing messages */
1312 for (; i < rejectedSlotVector->size(); i++) {
1313 int64_t curr_seqn = rejectedSlotVector->get(i);
1314 Slot *s_msg = buffer->getSlot(curr_seqn);
1317 prev_seqn = curr_seqn;
1319 /* Generate rejected message entry for missing messages */
1320 if (prev_seqn != -1) {
1321 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1324 /* Generate rejected message entries for present messages */
1325 for (; i < rejectedSlotVector->size(); i++) {
1326 int64_t curr_seqn = rejectedSlotVector->get(i);
1327 Slot *s_msg = buffer->getSlot(curr_seqn);
1328 int64_t machineid = s_msg->getMachineID();
1329 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1343 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344 bool seenLiveSlot = false;
1345 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1346 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1350 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352 // Push slot number forward
1353 if (!seenLiveSlot) {
1354 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1357 if (!previousSlot->isLive()) {
1361 // We have seen a live slot
1362 seenLiveSlot = true;
1364 // Get all the live entries for a slot
1365 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1367 // Iterate over all the live entries and try to rescue them
1368 uint lESize = liveEntries->size();
1369 for (uint i=0; i< lESize; i++) {
1370 Entry * liveEntry = liveEntries->get(i);
1371 if (slot->hasSpace(liveEntry)) {
1372 // Enough space to rescue the entry
1373 slot->addEntry(liveEntry);
1374 } else if (currentSequenceNumber == firstIfFull) {
1375 //if there's no space but the entry is about to fall off the queue
1376 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1382 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386 /* now go through live entries from least to greatest sequence number until
1387 * either all live slots added, or the slot doesn't have enough room
1388 * for SKIP_THRESHOLD consecutive entries*/
1390 int64_t newestseqnum = buffer->getNewestSeqNum();
1392 for (; seqn <= newestseqnum; seqn++) {
1393 Slot *prevslot = buffer->getSlot(seqn);
1394 //Push slot number forward
1396 oldestLiveSlotSequenceNumver = seqn;
1398 if (!prevslot->isLive())
1400 seenliveslot = true;
1401 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402 uint lESize = liveentries->size();
1403 for (uint i=0; i< lESize; i++) {
1404 Entry * liveentry = liveentries->get(i);
1405 if (s->hasSpace(liveentry))
1406 s->addEntry(liveentry);
1409 if (skipcount > Table_SKIP_THRESHOLD)
1419 * Checks for malicious activity and updates the local copy of the block chain->
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422 // The cloud communication layer has checked slot HMACs already
1424 if (newSlots->length() == 0) {
1428 // Make sure all slots are newer than the last largest slot this
1430 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431 if (firstSeqNum <= sequenceNumber) {
1432 throw new Error("Server Error: Sent older slots!");
1435 // Create an object that can access both new slots and slots in our
1436 // local chain without committing slots to our local chain
1437 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1439 // Check that the HMAC chain is not broken
1440 checkHMACChain(indexer, newSlots);
1442 // Set to keep track of messages from clients
1443 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1445 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446 while(lmit->hasNext())
1447 machineSet->add(lmit->next());
1451 // Process each slots data
1453 uint numSlots = newSlots->length();
1454 for(uint i=0; i<numSlots; i++) {
1455 Slot *slot = newSlots->get(i);
1456 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457 updateExpectedSize();
1461 // If there is a gap, check to see if the server sent us
1463 if (firstSeqNum != (sequenceNumber + 1)) {
1465 // Check the size of the slots that were sent down by the server->
1466 // Can only check the size if there was a gap
1467 checkNumSlots(newSlots->length());
1469 // Since there was a gap every machine must have pushed a slot or
1470 // must have a last message message-> If not then the server is
1472 if (!machineSet->isEmpty()) {
1473 throw new Error("Missing record for machines: ");
1477 // Update the size of our local block chain->
1480 // Commit new to slots to the local block chain->
1482 uint numSlots = newSlots->length();
1483 for(uint i=0; i<numSlots; i++) {
1484 Slot *slot = newSlots->get(i);
1486 // Insert this slot into our local block chain copy->
1487 buffer->putSlot(slot);
1489 // Keep track of how many slots are currently live (have live data
1494 // Get the sequence number of the latest slot in the system
1495 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496 updateLiveStateFromServer();
1498 // No Need to remember after we pulled from the server
1499 offlineTransactionsCommittedAndAtServer->clear();
1501 // This is invalidated now
1502 hadPartialSendToServer = false;
1505 void Table::updateLiveStateFromServer() {
1506 // Process the new transaction parts
1507 processNewTransactionParts();
1509 // Do arbitration on new transactions that were received
1510 arbitrateFromServer();
1512 // Update all the committed keys
1513 bool didCommitOrSpeculate = updateCommittedTable();
1515 // Delete the transactions that are now dead
1516 updateLiveTransactionsAndStatus();
1519 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1523 void Table::updateLiveStateFromLocal() {
1524 // Update all the committed keys
1525 bool didCommitOrSpeculate = updateCommittedTable();
1527 // Delete the transactions that are now dead
1528 updateLiveTransactionsAndStatus();
1531 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536 int64_t prevslots = firstSequenceNumber;
1538 if (didFindTableStatus) {
1540 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1543 didFindTableStatus = true;
1544 currMaxSize = numberOfSlots;
1547 void Table::updateExpectedSize() {
1550 if (expectedsize > currMaxSize) {
1551 expectedsize = currMaxSize;
1557 * Check the size of the block chain to make sure there are enough
1558 * slots sent back by the server-> This is only called when we have a
1559 * gap between the slots that we have locally and the slots sent by
1560 * the server therefore in the slots sent by the server there will be
1561 * at least 1 Table status message
1563 void Table::checkNumSlots(int numberOfSlots) {
1564 if (numberOfSlots != expectedsize) {
1565 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1570 * Update the size of of the local buffer if it is needed->
1572 void Table::commitNewMaxSize() {
1573 didFindTableStatus = false;
1575 // Resize the local slot buffer
1576 if (numberOfSlots != currMaxSize) {
1577 buffer->resize((int32_t)currMaxSize);
1580 // Change the number of local slots to the new size
1581 numberOfSlots = (int32_t)currMaxSize;
1583 // Recalculate the resize threshold since the size of the local
1584 // buffer has changed
1585 setResizeThreshold();
1589 * Process the new transaction parts from this latest round of slots
1590 * received from the server
1592 void Table::processNewTransactionParts() {
1594 if (newTransactionParts->size() == 0) {
1595 // Nothing new to process
1599 // Iterate through all the machine Ids that we received new parts
1601 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602 while(tpit->hasNext()) {
1603 int64_t machineId = tpit->next();
1604 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1606 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607 // Iterate through all the parts for that machine Id
1608 while(ptit->hasNext()) {
1609 Pair<int64_t, int32_t> * partId = ptit->next();
1610 TransactionPart *part = parts->get(partId);
1612 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615 // Set dead the transaction part
1621 // Get the transaction object for that sequence number
1622 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1624 if (transaction == NULL) {
1625 // This is a new transaction that we dont have so make a new one
1626 transaction = new Transaction();
1628 // Insert this new transaction into the live tables
1629 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1633 // Add that part to the transaction
1634 transaction->addPartDecode(part);
1639 // Clear all the new transaction parts in preparation for the next
1640 // time the server sends slots
1641 newTransactionParts->clear();
1644 void Table::arbitrateFromServer() {
1646 if (liveTransactionBySequenceNumberTable->size() == 0) {
1647 // Nothing to arbitrate on so move on
1651 // Get the transaction sequence numbers and sort from oldest to newest
1652 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1654 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1655 while(trit->hasNext())
1656 transactionSequenceNumbers->add(trit->next());
1659 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1661 // Collection of key value pairs that are
1662 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1664 // The last transaction arbitrated on
1665 int64_t lastTransactionCommitted = -1;
1666 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1667 uint tsnSize = transactionSequenceNumbers->size();
1668 for(uint i=0; i<tsnSize; i++) {
1669 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1670 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1672 // Check if this machine arbitrates for this transaction if not
1673 // then we cant arbitrate this transaction
1674 if (transaction->getArbitrator() != localMachineId) {
1678 if (transactionSequenceNumber < lastSeqNumArbOn) {
1682 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1683 // We have seen this already locally so dont commit again
1688 if (!transaction->isComplete()) {
1689 // Will arbitrate in incorrect order if we continue so just break
1695 // update the largest transaction seen by arbitrator from server
1696 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1697 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1699 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1700 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1701 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1705 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1706 // Guard evaluated as true
1708 // Update the local changes so we can make the commit
1709 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1710 while (kvit->hasNext()) {
1711 KeyValue *kv = kvit->next();
1712 speculativeTableTmp->put(kv->getKey(), kv);
1716 // Update what the last transaction committed was for use in batch commit
1717 lastTransactionCommitted = transactionSequenceNumber;
1719 // Guard evaluated was false so create abort
1721 Abort *newAbort = new Abort(NULL,
1722 transaction->getClientLocalSequenceNumber(),
1723 transaction->getSequenceNumber(),
1724 transaction->getMachineId(),
1725 transaction->getArbitrator(),
1726 localArbitrationSequenceNumber);
1727 localArbitrationSequenceNumber++;
1728 generatedAborts->add(newAbort);
1730 // Insert the abort so we can process
1731 processEntry(newAbort);
1734 lastSeqNumArbOn = transactionSequenceNumber;
1737 Commit *newCommit = NULL;
1739 // If there is something to commit
1740 if (speculativeTableTmp->size() != 0) {
1741 // Create the commit and increment the commit sequence number
1742 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1743 localArbitrationSequenceNumber++;
1745 // Add all the new keys to the commit
1746 for (KeyValue *kv : speculativeTableTmp->values()) {
1747 newCommit->addKV(kv);
1750 // create the commit parts
1751 newCommit->createCommitParts();
1753 // Append all the commit parts to the end of the pending queue
1754 // waiting for sending to the server
1755 // Insert the commit so we can process it
1756 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1757 processEntry(commitPart);
1761 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1762 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1763 pendingSendArbitrationRounds->add(arbitrationRound);
1765 if (compactArbitrationData()) {
1766 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1767 if (newArbitrationRound->getCommit() != NULL) {
1768 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1769 processEntry(commitPart);
1776 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1778 // Check if this machine arbitrates for this transaction if not then
1779 // we cant arbitrate this transaction
1780 if (transaction->getArbitrator() != localMachineId) {
1781 return Pair<bool, bool>(false, false);
1784 if (!transaction->isComplete()) {
1785 // Will arbitrate in incorrect order if we continue so just break
1787 return Pair<bool, bool>(false, false);
1790 if (transaction->getMachineId() != localMachineId) {
1791 // dont do this check for local transactions
1792 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1793 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1794 // We've have already seen this from the server
1795 return Pair<bool, bool>(false, false);
1800 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1801 // Guard evaluated as true Create the commit and increment the
1802 // commit sequence number
1803 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1804 localArbitrationSequenceNumber++;
1806 // Update the local changes so we can make the commit
1807 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1808 while (kvit->hasNext()) {
1809 KeyValue *kv = kvit->next();
1810 newCommit->addKV(kv);
1814 // create the commit parts
1815 newCommit->createCommitParts();
1817 // Append all the commit parts to the end of the pending queue
1818 // waiting for sending to the server
1819 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1820 pendingSendArbitrationRounds->add(arbitrationRound);
1822 if (compactArbitrationData()) {
1823 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1824 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1825 processEntry(commitPart);
1828 // Insert the commit so we can process it
1829 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1830 processEntry(commitPart);
1834 if (transaction->getMachineId() == localMachineId) {
1835 TransactionStatus *status = transaction->getTransactionStatus();
1836 if (status != NULL) {
1837 status->setStatus(TransactionStatus_StatusCommitted);
1841 updateLiveStateFromLocal();
1842 return Pair<bool, bool>(true, true);
1844 if (transaction->getMachineId() == localMachineId) {
1845 // For locally created messages update the status
1846 // Guard evaluated was false so create abort
1847 TransactionStatus * status = transaction->getTransactionStatus();
1848 if (status != NULL) {
1849 status->setStatus(TransactionStatus_StatusAborted);
1852 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1855 Abort *newAbort = new Abort(NULL,
1856 transaction->getClientLocalSequenceNumber(),
1858 transaction->getMachineId(),
1859 transaction->getArbitrator(),
1860 localArbitrationSequenceNumber);
1861 localArbitrationSequenceNumber++;
1862 addAbortSet->add(newAbort);
1864 // Append all the commit parts to the end of the pending queue
1865 // waiting for sending to the server
1866 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1867 pendingSendArbitrationRounds->add(arbitrationRound);
1869 if (compactArbitrationData()) {
1870 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1871 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1872 processEntry(commitPart);
1877 updateLiveStateFromLocal();
1878 return Pair<bool, bool>(true, false);
1883 * Compacts the arbitration data my merging commits and aggregating
1884 * aborts so that a single large push of commits can be done instead
1885 * of many small updates
1887 bool Table::compactArbitrationData() {
1888 if (pendingSendArbitrationRounds->size() < 2) {
1889 // Nothing to compact so do nothing
1893 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1894 if (lastRound->getDidSendPart()) {
1898 bool hadCommit = (lastRound->getCommit() == NULL);
1899 bool gotNewCommit = false;
1901 int numberToDelete = 1;
1902 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1903 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1905 if (round->isFull() || round->getDidSendPart()) {
1906 // Stop since there is a part that cannot be compacted and we
1907 // need to compact in order
1911 if (round->getCommit() == NULL) {
1912 // Try compacting aborts only
1913 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1914 if (newSize > ArbitrationRound_MAX_PARTS) {
1915 // Cant compact since it would be too large
1918 lastRound->addAborts(round->getAborts());
1920 // Create a new larger commit
1921 Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1922 localArbitrationSequenceNumber++;
1924 // Create the commit parts so that we can count them
1925 newCommit->createCommitParts();
1927 // Calculate the new size of the parts
1928 int newSize = newCommit->getNumberOfParts();
1929 newSize += lastRound->getAbortsCount();
1930 newSize += round->getAbortsCount();
1932 if (newSize > ArbitrationRound_MAX_PARTS) {
1933 // Cant compact since it would be too large
1937 // Set the new compacted part
1938 lastRound->setCommit(newCommit);
1939 lastRound->addAborts(round->getAborts());
1940 gotNewCommit = true;
1946 if (numberToDelete != 1) {
1947 // If there is a compaction
1948 // Delete the previous pieces that are now in the new compacted piece
1949 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1950 pendingSendArbitrationRounds->clear();
1952 for (int i = 0; i < numberToDelete; i++) {
1953 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1957 // Add the new compacted into the pending to send list
1958 pendingSendArbitrationRounds->add(lastRound);
1960 // Should reinsert into the commit processor
1961 if (hadCommit && gotNewCommit) {
1970 * Update all the commits and the committed tables, sets dead the dead
1973 bool Table::updateCommittedTable() {
1975 if (newCommitParts->size() == 0) {
1976 // Nothing new to process
1980 // Iterate through all the machine Ids that we received new parts for
1981 for (int64_t machineId : newCommitParts->keySet()) {
1982 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1984 // Iterate through all the parts for that machine Id
1985 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1986 CommitPart *part = parts->get(partId);
1988 // Get the transaction object for that sequence number
1989 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1991 if (commitForClientTable == NULL) {
1992 // This is the first commit from this device
1993 commitForClientTable = new Hashtable<int64_t, Commit *>();
1994 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1997 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1999 if (commit == NULL) {
2000 // This is a new commit that we dont have so make a new one
2001 commit = new Commit();
2003 // Insert this new commit into the live tables
2004 commitForClientTable->put(part->getSequenceNumber(), commit);
2007 // Add that part to the commit
2008 commit->addPartDecode(part);
2012 // Clear all the new commits parts in preparation for the next time
2013 // the server sends slots
2014 newCommitParts->clear();
2016 // If we process a new commit keep track of it for future use
2017 bool didProcessANewCommit = false;
2019 // Process the commits one by one
2020 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
2022 // Get all the commits for a specific arbitrator
2023 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2025 // Sort the commits in order
2026 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
2027 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2029 // Get the last commit seen from this arbitrator
2030 int64_t lastCommitSeenSequenceNumber = -1;
2031 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
2032 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2035 // Go through each new commit one by one
2036 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2037 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2038 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2040 // Special processing if a commit is not complete
2041 if (!commit->isComplete()) {
2042 if (i == (commitSequenceNumbers->size() - 1)) {
2043 // If there is an incomplete commit and this commit is the
2044 // latest one seen then this commit cannot be processed and
2045 // there are no other commits
2048 // This is a commit that was already dead but parts of it
2049 // are still in the block chain (not flushed out yet)->
2050 // Delete it and move on
2052 commitForClientTable->remove(commit->getSequenceNumber());
2057 // Update the last transaction that was updated if we can
2058 if (commit->getTransactionSequenceNumber() != -1) {
2059 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2061 // Update the last transaction sequence number that the arbitrator arbitrated on
2062 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2063 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2067 // Update the last arbitration data that we have seen so far
2068 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2069 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2070 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2072 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2075 // Never seen any data from this arbitrator so record the first one
2076 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2079 // We have already seen this commit before so need to do the
2080 // full processing on this commit
2081 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2083 // Update the last transaction that was updated if we can
2084 if (commit->getTransactionSequenceNumber() != -1) {
2085 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2087 // Update the last transaction sequence number that the arbitrator arbitrated on
2088 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2089 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2096 // If we got here then this is a brand new commit and needs full
2098 // Get what commits should be edited, these are the commits that
2099 // have live values for their keys
2100 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2102 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2103 while (kvit->hasNext()) {
2104 KeyValue *kv = kvit->next();
2105 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2109 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2111 // Update each previous commit that needs to be updated
2112 for (Commit *previousCommit : commitsToEdit) {
2114 // Only bother with live commits (TODO: Maybe remove this check)
2115 if (previousCommit->isLive()) {
2117 // Update which keys in the old commits are still live
2119 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2120 while (kvit->hasNext()) {
2121 KeyValue *kv = kvit->next();
2122 previousCommit->invalidateKey(kv->getKey());
2127 // if the commit is now dead then remove it
2128 if (!previousCommit->isLive()) {
2129 commitForClientTable->remove(previousCommit);
2134 // Update the last seen sequence number from this arbitrator
2135 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2136 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2137 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2140 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2143 // We processed a new commit that we havent seen before
2144 didProcessANewCommit = true;
2146 // Update the committed table of keys and which commit is using which key
2148 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2149 while (kvit->hasNext()) {
2150 KeyValue *kv = kvit->next();
2151 committedKeyValueTable->put(kv->getKey(), kv);
2152 liveCommitsByKeyTable->put(kv->getKey(), commit);
2159 return didProcessANewCommit;
2163 * Create the speculative table from transactions that are still live
2164 * and have come from the cloud
2166 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2167 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2168 // There is nothing to speculate on
2172 // Create a list of the transaction sequence numbers and sort them
2173 // from oldest to newest
2174 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2175 qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2177 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2180 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2181 // If there is a gap in the transaction sequence numbers then
2182 // there was a commit or an abort of a transaction OR there was a
2183 // new commit (Could be from offline commit) so a redo the
2184 // speculation from scratch
2186 // Start from scratch
2187 speculatedKeyValueTable->clear();
2188 lastTransactionSequenceNumberSpeculatedOn = -1;
2189 oldestTransactionSequenceNumberSpeculatedOn = -1;
2193 // Remember the front of the transaction list
2194 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2196 // Find where to start arbitration from
2197 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2199 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2200 // Make sure we are not out of bounds
2201 return false; // did not speculate
2204 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2205 bool didSkip = true;
2207 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2208 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2209 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2211 if (!transaction->isComplete()) {
2212 // If there is an incomplete transaction then there is nothing
2213 // we can do add this transactions arbitrator to the list of
2214 // arbitrators we should ignore
2215 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2220 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2224 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2226 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2227 // Guard evaluated to true so update the speculative table
2229 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2230 while (kvit->hasNext()) {
2231 KeyValue *kv = kvit->next();
2232 speculatedKeyValueTable->put(kv->getKey(), kv);
2240 // Since there was a skip we need to redo the speculation next time around
2241 lastTransactionSequenceNumberSpeculatedOn = -1;
2242 oldestTransactionSequenceNumberSpeculatedOn = -1;
2245 // We did some speculation
2250 * Create the pending transaction speculative table from transactions
2251 * that are still in the pending transaction buffer
2253 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2254 if (pendingTransactionQueue->size() == 0) {
2255 // There is nothing to speculate on
2259 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2260 // need to reset on the pending speculation
2261 lastPendingTransactionSpeculatedOn = NULL;
2262 firstPendingTransaction = pendingTransactionQueue->get(0);
2263 pendingTransactionSpeculatedKeyValueTable->clear();
2266 // Find where to start arbitration from
2267 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2269 if (startIndex >= pendingTransactionQueue->size()) {
2270 // Make sure we are not out of bounds
2274 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2275 Transaction *transaction = pendingTransactionQueue->get(i);
2277 lastPendingTransactionSpeculatedOn = transaction;
2279 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2280 // Guard evaluated to true so update the speculative table
2281 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2282 while (kvit->hasNext()) {
2283 KeyValue *kv = kvit->next();
2284 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2292 * Set dead and remove from the live transaction tables the
2293 * transactions that are dead
2295 void Table::updateLiveTransactionsAndStatus() {
2297 // Go through each of the transactions
2298 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2299 Transaction *transaction = iter->next()->getValue();
2301 // Check if the transaction is dead
2302 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2303 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2305 // Set dead the transaction
2306 transaction->setDead();
2308 // Remove the transaction from the live table
2310 liveTransactionByTransactionIdTable->remove(transaction->getId());
2314 // Go through each of the transactions
2315 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2316 TransactionStatus *status = iter->next()->getValue();
2318 // Check if the transaction is dead
2319 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2320 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2323 status->setStatus(TransactionStatus_StatusCommitted);
2332 * Process this slot, entry by entry-> Also update the latest message sent by slot
2334 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2336 // Update the last message seen
2337 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2339 // Process each entry in the slot
2340 Vector<Entry *> *entries = slot->getEntries();
2341 uint eSize = entries->size();
2342 for(uint ei=0; ei < eSize; ei++) {
2343 Entry * entry = entries->get(ei);
2344 switch (entry->getType()) {
2345 case TypeCommitPart:
2346 processEntry((CommitPart *)entry);
2349 processEntry((Abort *)entry);
2351 case TypeTransactionPart:
2352 processEntry((TransactionPart *)entry);
2355 processEntry((NewKey *)entry);
2357 case TypeLastMessage:
2358 processEntry((LastMessage *)entry, machineSet);
2360 case TypeRejectedMessage:
2361 processEntry((RejectedMessage *)entry, indexer);
2363 case TypeTableStatus:
2364 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2367 throw new Error("Unrecognized type: ");
2373 * Update the last message that was sent for a machine Id
2375 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2376 // Update what the last message received by a machine was
2377 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2381 * Add the new key to the arbitrators table and update the set of live
2382 * new keys (in case of a rescued new key message)
2384 void Table::processEntry(NewKey *entry) {
2385 // Update the arbitrator table with the new key information
2386 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2388 // Update what the latest live new key is
2389 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2390 if (oldNewKey != NULL) {
2391 // Delete the old new key messages
2392 oldNewKey->setDead();
2397 * Process new table status entries and set dead the old ones as new
2398 * ones come in-> keeps track of the largest and smallest table status
2399 * seen in this current round of updating the local copy of the block
2402 void Table::processEntry(TableStatus * entry, int64_t seq) {
2403 int newNumSlots = entry->getMaxSlots();
2404 updateCurrMaxSize(newNumSlots);
2405 initExpectedSize(seq, newNumSlots);
2407 if (liveTableStatus != NULL) {
2408 // We have a larger table status so the old table status is no
2410 liveTableStatus->setDead();
2413 // Make this new table status the latest alive table status
2414 liveTableStatus = entry;
2418 * Check old messages to see if there is a block chain violation->
2421 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2422 int64_t oldSeqNum = entry->getOldSeqNum();
2423 int64_t newSeqNum = entry->getNewSeqNum();
2424 bool isequal = entry->getEqual();
2425 int64_t machineId = entry->getMachineID();
2426 int64_t seq = entry->getSequenceNumber();
2428 // Check if we have messages that were supposed to be rejected in
2429 // our local block chain
2430 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2432 Slot *slot = indexer->getSlot(seqNum);
2435 // If we have this slot make sure that it was not supposed to be
2437 int64_t slotMachineId = slot->getMachineID();
2438 if (isequal != (slotMachineId == machineId)) {
2439 throw new Error("Server Error: Trying to insert rejected message for slot ");
2444 // Create a list of clients to watch until they see this rejected
2446 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2447 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2448 // Machine ID for the last message entry
2449 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2451 // We've seen it, don't need to continue to watch-> Our next
2452 // message will implicitly acknowledge it->
2453 if (lastMessageEntryMachineId == localMachineId) {
2457 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2458 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2460 if (entrySequenceNumber < seq) {
2461 // Add this rejected message to the set of messages that this
2462 // machine ID did not see yet
2463 addWatchVector(lastMessageEntryMachineId, entry);
2464 // This client did not see this rejected message yet so add it
2465 // to the watch set to monitor
2466 deviceWatchSet->add(lastMessageEntryMachineId);
2469 if (deviceWatchSet->isEmpty()) {
2470 // This rejected message has been seen by all the clients so
2473 // We need to watch this rejected message
2474 entry->setWatchSet(deviceWatchSet);
2479 * Check if this abort is live, if not then save it so we can kill it
2480 * later-> update the last transaction number that was arbitrated on->
2482 void Table::processEntry(Abort *entry) {
2483 if (entry->getTransactionSequenceNumber() != -1) {
2484 // update the transaction status if it was sent to the server
2485 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2486 if (status != NULL) {
2487 status->setStatus(TransactionStatus_StatusAborted);
2491 // Abort has not been seen by the client it is for yet so we need to
2493 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2494 if (previouslySeenAbort != NULL) {
2495 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2498 if (entry->getTransactionArbitrator() == localMachineId) {
2499 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2502 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2503 // The machine already saw this so it is dead
2505 liveAbortTable->remove(&entry->getAbortId());
2507 if (entry->getTransactionArbitrator() == localMachineId) {
2508 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2513 // Update the last arbitration data that we have seen so far
2514 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2515 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2516 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2518 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2521 // Never seen any data from this arbitrator so record the first one
2522 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2525 // Set dead a transaction if we can
2526 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2527 if (transactionToSetDead != NULL) {
2528 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2531 // Update the last transaction sequence number that the arbitrator
2533 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2534 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2536 if (entry->getTransactionSequenceNumber() != -1) {
2537 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2543 * Set dead the transaction part if that transaction is dead and keep
2544 * track of all new parts
2546 void Table::processEntry(TransactionPart *entry) {
2547 // Check if we have already seen this transaction and set it dead OR
2548 // if it is not alive
2549 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2550 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2551 // This transaction is dead, it was already committed or aborted
2556 // This part is still alive
2557 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2559 if (transactionPart == NULL) {
2560 // Dont have a table for this machine Id yet so make one
2561 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2562 newTransactionParts->put(entry->getMachineId(), transactionPart);
2565 // Update the part and set dead ones we have already seen (got a
2567 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2568 if (previouslySeenPart != NULL) {
2569 previouslySeenPart->setDead();
2574 * Process new commit entries and save them for future use-> Delete duplicates
2576 void Table::processEntry(CommitPart *entry) {
2577 // Update the last transaction that was updated if we can
2578 if (entry->getTransactionSequenceNumber() != -1) {
2579 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2580 // Update the last transaction sequence number that the arbitrator
2582 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2583 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2587 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2588 if (commitPart == NULL) {
2589 // Don't have a table for this machine Id yet so make one
2590 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2591 newCommitParts->put(entry->getMachineId(), commitPart);
2593 // Update the part and set dead ones we have already seen (got a
2595 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2596 if (previouslySeenPart != NULL) {
2597 previouslySeenPart->setDead();
2602 * Update the last message seen table-> Update and set dead the
2603 * appropriate RejectedMessages as clients see them-> Updates the live
2604 * aborts, removes those that are dead and sets them dead-> Check that
2605 * the last message seen is correct and that there is no mismatch of
2606 * our own last message or that other clients have not had a rollback
2607 * on the last message->
2609 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2610 // We have seen this machine ID
2611 machineSet->remove(machineId);
2613 // Get the set of rejected messages that this machine Id is has not seen yet
2614 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2615 // If there is a rejected message that this machine Id has not seen yet
2616 if (watchset != NULL) {
2617 // Go through each rejected message that this machine Id has not
2620 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2621 while(rmit->hasNext()) {
2622 RejectedMessage *rm = rmit->next();
2623 // If this machine Id has seen this rejected message->->->
2624 if (rm->getSequenceNumber() <= seqNum) {
2625 // Remove it from our watchlist
2627 // Decrement machines that need to see this notification
2628 rm->removeWatcher(machineId);
2634 // Set dead the abort
2635 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2636 Abort *abort = i->next()->getValue();
2637 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2640 if (abort->getTransactionArbitrator() == localMachineId) {
2641 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2645 if (machineId == localMachineId) {
2646 // Our own messages are immediately dead->
2647 char livenessType = liveness->getType();
2648 if (livenessType==TypeLastMessage) {
2649 ((LastMessage *)liveness)->setDead();
2650 } else if (livenessType == TypeSlot) {
2651 ((Slot *)liveness)->setDead();
2653 throw new Error("Unrecognized type");
2656 // Get the old last message for this device
2657 Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2658 if (lastMessageEntry == NULL) {
2659 // If no last message then there is nothing else to process
2663 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2664 Liveness *lastEntry = lastMessageEntry->getSecond();
2665 delete lastMessageEntry;
2667 // If it is not our machine Id since we already set ours to dead
2668 if (machineId != localMachineId) {
2669 char lastEntryType = lastEntry->getType();
2671 if (lastEntryType == TypeLastMessage) {
2672 ((LastMessage *)lastEntry)->setDead();
2673 } else if (lastEntryType == TypeSlot) {
2674 ((Slot *)lastEntry)->setDead();
2676 throw new Error("Unrecognized type");
2679 // Make sure the server is not playing any games
2680 if (machineId == localMachineId) {
2681 if (hadPartialSendToServer) {
2682 // We were not making any updates and we had a machine mismatch
2683 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2684 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2687 // We were not making any updates and we had a machine mismatch
2688 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2689 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2693 if (lastMessageSeqNum > seqNum) {
2694 throw new Error("Server Error: Rollback on remote machine sequence number");
2700 * Add a rejected message entry to the watch set to keep track of
2701 * which clients have seen that rejected message entry and which have
2704 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2705 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2706 if (entries == NULL) {
2707 // There is no set for this machine ID yet so create one
2708 entries = new Hashset<RejectedMessage *>();
2709 rejectedMessageWatchVectorTable->put(machineId, entries);
2711 entries->add(entry);
2715 * Check if the HMAC chain is not violated
2717 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2718 for (int i = 0; i < newSlots->length(); i++) {
2719 Slot *currSlot = newSlots->get(i);
2720 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2721 if (prevSlot != NULL &&
2722 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2723 throw new Error("Server Error: Invalid HMAC Chain");