3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
23 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
25 cloud(new CloudComm(this, baseurl, password, listeningPort)),
27 liveTableStatus(NULL),
28 pendingTransactionBuilder(NULL),
29 lastPendingTransactionSpeculatedOn(NULL),
30 firstPendingTransaction(NULL),
32 bufferResizeThreshold(0),
34 oldestLiveSlotSequenceNumver(1),
35 localMachineId(_localMachineId),
37 localTransactionSequenceNumber(0),
38 lastTransactionSequenceNumberSpeculatedOn(0),
39 oldestTransactionSequenceNumberSpeculatedOn(0),
40 localArbitrationSequenceNumber(0),
41 hadPartialSendToServer(false),
42 attemptedToSendToServer(false),
44 didFindTableStatus(false),
46 lastSlotAttemptedToSend(NULL),
49 lastTransactionPartsSent(NULL),
50 lastPendingSendArbitrationEntriesToDelete(NULL),
52 committedKeyValueTable(NULL),
53 speculatedKeyValueTable(NULL),
54 pendingTransactionSpeculatedKeyValueTable(NULL),
55 liveNewKeyTable(NULL),
56 lastMessageTable(NULL),
57 rejectedMessageWatchVectorTable(NULL),
58 arbitratorTable(NULL),
60 newTransactionParts(NULL),
62 lastArbitratedTransactionNumberByArbitratorTable(NULL),
63 liveTransactionBySequenceNumberTable(NULL),
64 liveTransactionByTransactionIdTable(NULL),
65 liveCommitsTable(NULL),
66 liveCommitsByKeyTable(NULL),
67 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
68 rejectedSlotVector(NULL),
69 pendingTransactionQueue(NULL),
70 pendingSendArbitrationRounds(NULL),
71 pendingSendArbitrationEntriesToDelete(NULL),
72 transactionPartsSent(NULL),
73 outstandingTransactionStatus(NULL),
74 liveAbortsGeneratedByLocal(NULL),
75 offlineTransactionsCommittedAndAtServer(NULL),
76 localCommunicationTable(NULL),
77 lastTransactionSeenFromMachineFromServer(NULL),
78 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
79 lastInsertedNewKey(false),
85 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
89 liveTableStatus(NULL),
90 pendingTransactionBuilder(NULL),
91 lastPendingTransactionSpeculatedOn(NULL),
92 firstPendingTransaction(NULL),
94 bufferResizeThreshold(0),
96 oldestLiveSlotSequenceNumver(1),
97 localMachineId(_localMachineId),
99 localTransactionSequenceNumber(0),
100 lastTransactionSequenceNumberSpeculatedOn(0),
101 oldestTransactionSequenceNumberSpeculatedOn(0),
102 localArbitrationSequenceNumber(0),
103 hadPartialSendToServer(false),
104 attemptedToSendToServer(false),
106 didFindTableStatus(false),
108 lastSlotAttemptedToSend(NULL),
111 lastTransactionPartsSent(NULL),
112 lastPendingSendArbitrationEntriesToDelete(NULL),
114 committedKeyValueTable(NULL),
115 speculatedKeyValueTable(NULL),
116 pendingTransactionSpeculatedKeyValueTable(NULL),
117 liveNewKeyTable(NULL),
118 lastMessageTable(NULL),
119 rejectedMessageWatchVectorTable(NULL),
120 arbitratorTable(NULL),
121 liveAbortTable(NULL),
122 newTransactionParts(NULL),
123 newCommitParts(NULL),
124 lastArbitratedTransactionNumberByArbitratorTable(NULL),
125 liveTransactionBySequenceNumberTable(NULL),
126 liveTransactionByTransactionIdTable(NULL),
127 liveCommitsTable(NULL),
128 liveCommitsByKeyTable(NULL),
129 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
130 rejectedSlotVector(NULL),
131 pendingTransactionQueue(NULL),
132 pendingSendArbitrationRounds(NULL),
133 pendingSendArbitrationEntriesToDelete(NULL),
134 transactionPartsSent(NULL),
135 outstandingTransactionStatus(NULL),
136 liveAbortsGeneratedByLocal(NULL),
137 offlineTransactionsCommittedAndAtServer(NULL),
138 localCommunicationTable(NULL),
139 lastTransactionSeenFromMachineFromServer(NULL),
140 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
141 lastInsertedNewKey(false),
148 * Init all the stuff needed for for table usage
151 // Init helper objects
152 random = new Random();
153 buffer = new SlotBuffer();
156 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
157 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
158 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
159 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
160 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
161 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
162 arbitratorTable = new Hashtable<IoTString *, int64_t>();
163 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
164 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
165 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
166 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
167 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
168 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
169 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
170 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
171 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
172 rejectedSlotVector = new Vector<int64_t>();
173 pendingTransactionQueue = new Vector<Transaction *>();
174 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
175 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
176 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
177 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
178 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
179 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
180 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
181 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
182 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
185 numberOfSlots = buffer->capacity();
186 setResizeThreshold();
190 * Initialize the table by inserting a table status as the first entry
191 * into the table status also initialize the crypto stuff.
193 void Table::initTable() {
194 cloud->initSecurity();
196 // Create the first insertion into the block chain which is the table status
197 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
198 localSequenceNumber++;
199 TableStatus *status = new TableStatus(s, numberOfSlots);
201 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
204 array = new Array<Slot *>(1);
206 // update local block chain
207 validateAndUpdate(array, true);
208 } else if (array->length() == 1) {
209 // in case we did push the slot BUT we failed to init it
210 validateAndUpdate(array, true);
212 throw new Error("Error on initialization");
217 * Rebuild the table from scratch by pulling the latest block chain
220 void Table::rebuild() {
221 // Just pull the latest slots from the server
222 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
223 validateAndUpdate(newslots, true);
225 updateLiveTransactionsAndStatus();
228 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
229 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
232 int64_t Table::getArbitrator(IoTString *key) {
233 return arbitratorTable->get(key);
236 void Table::close() {
240 IoTString *Table::getCommitted(IoTString *key) {
241 KeyValue *kv = committedKeyValueTable->get(key);
244 return kv->getValue();
250 IoTString *Table::getSpeculative(IoTString *key) {
251 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
254 kv = speculatedKeyValueTable->get(key);
258 kv = committedKeyValueTable->get(key);
262 return kv->getValue();
268 IoTString *Table::getCommittedAtomic(IoTString *key) {
269 KeyValue *kv = committedKeyValueTable->get(key);
271 if (!arbitratorTable->contains(key)) {
272 throw new Error("Key not Found.");
275 // Make sure new key value pair matches the current arbitrator
276 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
277 // TODO: Maybe not throw en error
278 throw new Error("Not all Key Values Match Arbitrator.");
282 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
283 return kv->getValue();
285 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
290 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
291 if (!arbitratorTable->contains(key)) {
292 throw new Error("Key not Found.");
295 // Make sure new key value pair matches the current arbitrator
296 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
297 // TODO: Maybe not throw en error
298 throw new Error("Not all Key Values Match Arbitrator.");
301 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
304 kv = speculatedKeyValueTable->get(key);
308 kv = committedKeyValueTable->get(key);
312 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
313 return kv->getValue();
315 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
320 bool Table::update() {
322 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
323 validateAndUpdate(newSlots, false);
325 updateLiveTransactionsAndStatus();
327 } catch (Exception *e) {
328 SetIterator<int64_t> *kit = getKeyIterator(localCommunicationTable);
329 while (kit->hasNext()) {
330 int64_t m = kit->next();
339 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
341 if (!arbitratorTable->contains(keyName)) {
342 // There is already an arbitrator
345 NewKey *newKey = new NewKey(NULL, keyName, machineId);
347 if (sendToServer(newKey)) {
348 // If successfully inserted
354 void Table::startTransaction() {
355 // Create a new transaction, invalidates any old pending transactions.
356 pendingTransactionBuilder = new PendingTransaction(localMachineId);
359 void Table::addKV(IoTString *key, IoTString *value) {
361 // Make sure it is a valid key
362 if (!arbitratorTable->contains(key)) {
363 throw new Error("Key not Found.");
366 // Make sure new key value pair matches the current arbitrator
367 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
368 // TODO: Maybe not throw en error
369 throw new Error("Not all Key Values Match Arbitrator.");
372 // Add the key value to this transaction
373 KeyValue *kv = new KeyValue(key, value);
374 pendingTransactionBuilder->addKV(kv);
377 TransactionStatus *Table::commitTransaction() {
378 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
379 // transaction with no updates will have no effect on the system
380 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
383 // Set the local transaction sequence number and increment
384 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
385 localTransactionSequenceNumber++;
387 // Create the transaction status
388 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
390 // Create the new transaction
391 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
392 newTransaction->setTransactionStatus(transactionStatus);
394 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
395 // Add it to the queue and invalidate the builder for safety
396 pendingTransactionQueue->add(newTransaction);
398 arbitrateOnLocalTransaction(newTransaction);
399 updateLiveStateFromLocal();
402 pendingTransactionBuilder = new PendingTransaction(localMachineId);
406 } catch (ServerException *e) {
408 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
409 uint size = pendingTransactionQueue->size();
411 for (int iter = 0; iter < size; iter++) {
412 Transaction *transaction = pendingTransactionQueue->get(iter);
413 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
415 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
416 // Already contacted this client so ignore all attempts to contact this client
417 // to preserve ordering for arbitrator
421 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
423 if (sendReturn.getFirst()) {
424 // Failed to contact over local
425 arbitratorTriedAndFailed->add(transaction->getArbitrator());
427 // Successful contact or should not contact
429 if (sendReturn.getSecond()) {
435 pendingTransactionQueue->setSize(oldindex);
438 updateLiveStateFromLocal();
440 return transactionStatus;
444 * Recalculate the new resize threshold
446 void Table::setResizeThreshold() {
447 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
448 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
451 int64_t Table::getLocalSequenceNumber() {
452 return localSequenceNumber;
455 bool Table::sendToServer(NewKey *newKey) {
456 bool fromRetry = false;
458 if (hadPartialSendToServer) {
459 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
460 if (newSlots->length() == 0) {
462 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
464 if (sendSlotsReturn.getFirst()) {
465 if (newKey != NULL) {
466 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
471 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
472 while (trit->hasNext()) {
473 Transaction *transaction = trit->next();
474 transaction->resetServerFailure();
475 // Update which transactions parts still need to be sent
476 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
477 // Add the transaction status to the outstanding list
478 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
480 // Update the transaction status
481 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
483 // Check if all the transaction parts were successfully
484 // sent and if so then remove it from pending
485 if (transaction->didSendAllParts()) {
486 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
487 pendingTransactionQueue->remove(transaction);
492 newSlots = sendSlotsReturn.getThird();
493 bool isInserted = false;
494 for (uint si = 0; si < newSlots->length(); si++) {
495 Slot *s = newSlots->get(si);
496 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
502 for (uint si = 0; si < newSlots->length(); si++) {
503 Slot *s = newSlots->get(si);
508 // Process each entry in the slot
509 Vector<Entry *> *ventries = s->getEntries();
510 uint vesize = ventries->size();
511 for (uint vei = 0; vei < vesize; vei++) {
512 Entry *entry = ventries->get(vei);
513 if (entry->getType() == TypeLastMessage) {
514 LastMessage *lastMessage = (LastMessage *)entry;
515 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
524 if (newKey != NULL) {
525 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
530 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
531 while (trit->hasNext()) {
532 Transaction *transaction = trit->next();
533 transaction->resetServerFailure();
535 // Update which transactions parts still need to be sent
536 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
538 // Add the transaction status to the outstanding list
539 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
541 // Update the transaction status
542 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
544 // Check if all the transaction parts were successfully sent and if so then remove it from pending
545 if (transaction->didSendAllParts()) {
546 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
547 pendingTransactionQueue->remove(transaction);
549 transaction->resetServerFailure();
550 // Set the transaction sequence number back to nothing
551 if (!transaction->didSendAPartToServer()) {
552 transaction->setSequenceNumber(-1);
560 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
561 while (trit->hasNext()) {
562 Transaction *transaction = trit->next();
563 transaction->resetServerFailure();
564 // Set the transaction sequence number back to nothing
565 if (!transaction->didSendAPartToServer()) {
566 transaction->setSequenceNumber(-1);
571 if (sendSlotsReturn.getThird()->length() != 0) {
572 // insert into the local block chain
573 validateAndUpdate(sendSlotsReturn.getThird(), true);
577 bool isInserted = false;
578 for (uint si = 0; si < newSlots->length(); si++) {
579 Slot *s = newSlots->get(si);
580 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
586 for (uint si = 0; si < newSlots->length(); si++) {
587 Slot *s = newSlots->get(si);
592 // Process each entry in the slot
593 Vector<Entry *> *entries = s->getEntries();
594 uint eSize = entries->size();
595 for(uint ei=0; ei < eSize; ei++) {
596 Entry * entry = entries->get(ei);
598 if (entry->getType() == TypeLastMessage) {
599 LastMessage *lastMessage = (LastMessage *)entry;
600 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
609 if (newKey != NULL) {
610 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
615 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
616 while (trit->hasNext()) {
617 Transaction *transaction = trit->next();
618 transaction->resetServerFailure();
620 // Update which transactions parts still need to be sent
621 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
623 // Add the transaction status to the outstanding list
624 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
626 // Update the transaction status
627 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
629 // Check if all the transaction parts were successfully sent and if so then remove it from pending
630 if (transaction->didSendAllParts()) {
631 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
632 pendingTransactionQueue->remove(transaction);
634 transaction->resetServerFailure();
635 // Set the transaction sequence number back to nothing
636 if (!transaction->didSendAPartToServer()) {
637 transaction->setSequenceNumber(-1);
643 SetIterator<Transaction *> *trit = getKeyIterator(lastTransactionPartsSent);
644 while (trit->hasNext()) {
645 Transaction *transaction = trit->next();
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 // insert into the local block chain
656 validateAndUpdate(newSlots, true);
659 } catch (ServerException *e) {
666 // While we have stuff that needs inserting into the block chain
667 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
671 if (hadPartialSendToServer) {
672 throw new Error("Should Be error free");
677 // If there is a new key with same name then end
678 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
683 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
684 localSequenceNumber++;
686 // Try to fill the slot with data
687 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
688 bool needsResize = fillSlotsReturn.getFirst();
689 int newSize = fillSlotsReturn.getSecond();
690 bool insertedNewKey = fillSlotsReturn.getThird();
693 // Reset which transaction to send
694 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
695 while (trit->hasNext()) {
696 Transaction *transaction = trit->next();
697 transaction->resetNextPartToSend();
699 // Set the transaction sequence number back to nothing
700 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
701 transaction->setSequenceNumber(-1);
706 // Clear the sent data since we are trying again
707 pendingSendArbitrationEntriesToDelete->clear();
708 transactionPartsSent->clear();
710 // We needed a resize so try again
711 fillSlot(slot, true, newKey);
714 lastSlotAttemptedToSend = slot;
715 lastIsNewKey = (newKey != NULL);
716 lastInsertedNewKey = insertedNewKey;
717 lastNewSize = newSize;
719 lastTransactionPartsSent = transactionPartsSent->clone();
720 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
722 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
724 if (sendSlotsReturn.getFirst()) {
726 // Did insert into the block chain
728 if (insertedNewKey) {
729 // This slot was what was inserted not a previous slot
731 // New Key was successfully inserted into the block chain so dont want to insert it again
735 // Remove the aborts and commit parts that were sent from the pending to send queue
736 uint size = pendingSendArbitrationRounds->size();
738 for (uint i = 0; i < size; i++) {
739 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
740 round->removeParts(pendingSendArbitrationEntriesToDelete);
742 if (!round->isDoneSending()) {
743 // Sent all the parts
744 pendingSendArbitrationRounds->set(oldcount++,
745 pendingSendArbitrationRounds->get(i));
748 pendingSendArbitrationRounds->setSize(oldcount);
750 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
751 while (trit->hasNext()) {
752 Transaction *transaction = trit->next();
753 transaction->resetServerFailure();
755 // Update which transactions parts still need to be sent
756 transaction->removeSentParts(transactionPartsSent->get(transaction));
758 // Add the transaction status to the outstanding list
759 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
761 // Update the transaction status
762 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
764 // Check if all the transaction parts were successfully sent and if so then remove it from pending
765 if (transaction->didSendAllParts()) {
766 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
767 pendingTransactionQueue->remove(transaction);
772 // Reset which transaction to send
773 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
774 while (trit->hasNext()) {
775 Transaction *transaction = trit->next();
776 transaction->resetNextPartToSend();
778 // Set the transaction sequence number back to nothing
779 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
780 transaction->setSequenceNumber(-1);
786 // Clear the sent data in preparation for next send
787 pendingSendArbitrationEntriesToDelete->clear();
788 transactionPartsSent->clear();
790 if (sendSlotsReturn.getThird()->length() != 0) {
791 // insert into the local block chain
792 validateAndUpdate(sendSlotsReturn.getThird(), true);
796 } catch (ServerException *e) {
797 if (e->getType() != ServerException->TypeInputTimeout) {
798 // Nothing was able to be sent to the server so just clear these data structures
799 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
800 while (trit->hasNext()) {
801 Transaction *transaction = trit->next();
802 transaction->resetNextPartToSend();
804 // Set the transaction sequence number back to nothing
805 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
806 transaction->setSequenceNumber(-1);
811 // There was a partial send to the server
812 hadPartialSendToServer = true;
814 // Nothing was able to be sent to the server so just clear these data structures
815 SetIterator<Transaction *> *trit = getKeyIterator(transactionPartsSent);
816 while (trit->hasNext()) {
817 Transaction *transaction = trit->next();
818 transaction->resetNextPartToSend();
819 transaction->setServerFailure();
824 pendingSendArbitrationEntriesToDelete->clear();
825 transactionPartsSent->clear();
830 return newKey == NULL;
833 bool Table::updateFromLocal(int64_t machineId) {
834 if (!localCommunicationTable->contains(machineId))
837 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
839 // Get the size of the send data
840 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
842 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
843 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
844 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
847 Array<char> *sendData = new Array<char>(sendDataSize);
848 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
851 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
855 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
856 localSequenceNumber++;
858 if (returnData == NULL) {
859 // Could not contact server
864 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
865 int numberOfEntries = bbDecode->getInt();
867 for (int i = 0; i < numberOfEntries; i++) {
868 char type = bbDecode->get();
869 if (type == TypeAbort) {
870 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
872 } else if (type == TypeCommitPart) {
873 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
874 processEntry(commitPart);
878 updateLiveStateFromLocal();
883 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
885 // Get the devices local communications
886 if (!localCommunicationTable->contains(machineId))
887 return Pair<bool, bool>(true, false);
889 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
891 // Get the size of the send data
892 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
893 for (TransactionPart *part : transaction->getParts()->values()) {
894 sendDataSize += part->getSize();
897 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
898 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
899 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
902 // Make the send data size
903 Array<char> *sendData = new Array<char>(sendDataSize);
904 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
907 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
908 bbEncode->putInt(transaction->getParts()->size());
909 for (TransactionPart *part : transaction->getParts()->values()) {
910 part->encode(bbEncode);
915 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
916 localSequenceNumber++;
918 if (returnData == NULL) {
919 // Could not contact server
920 return Pair<bool, bool>(true, false);
924 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
925 bool didCommit = bbDecode->get() == 1;
926 bool couldArbitrate = bbDecode->get() == 1;
927 int numberOfEntries = bbDecode->getInt();
928 bool foundAbort = false;
930 for (int i = 0; i < numberOfEntries; i++) {
931 char type = bbDecode->get();
932 if (type == TypeAbort) {
933 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
935 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
940 } else if (type == TypeCommitPart) {
941 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
942 processEntry(commitPart);
946 updateLiveStateFromLocal();
948 if (couldArbitrate) {
949 TransactionStatus * status = transaction->getTransactionStatus();
951 status->setStatus(TransactionStatus_StatusCommitted);
953 status->setStatus(TransactionStatus_StatusAborted);
956 TransactionStatus * status = transaction->getTransactionStatus();
958 status->setStatus(TransactionStatus_StatusAborted);
960 status->setStatus(TransactionStatus_StatusCommitted);
964 return Pair<bool, bool>(false, true);
967 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
970 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
971 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
972 int numberOfParts = bbDecode->getInt();
974 // If we did commit a transaction or not
975 bool didCommit = false;
976 bool couldArbitrate = false;
978 if (numberOfParts != 0) {
980 // decode the transaction
981 Transaction *transaction = new Transaction();
982 for (int i = 0; i < numberOfParts; i++) {
984 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
985 transaction->addPartDecode(newPart);
988 // Arbitrate on transaction and pull relevant return data
989 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
990 couldArbitrate = localArbitrateReturn.getFirst();
991 didCommit = localArbitrateReturn.getSecond();
993 updateLiveStateFromLocal();
995 // Transaction was sent to the server so keep track of it to prevent double commit
996 if (transaction->getSequenceNumber() != -1) {
997 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
1001 // The data to send back
1002 int returnDataSize = 0;
1003 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1005 // Get the aborts to send back
1006 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
1007 Collections->sort(abortLocalSequenceNumbers);
1008 uint asize = abortLocalSequenceNumbers->size();
1009 for(uint i=0; i<asize; i++) {
1010 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1011 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1015 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1016 unseenArbitrations->add(abort);
1017 returnDataSize += abort->getSize();
1020 // Get the commits to send back
1021 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1022 if (commitForClientTable != NULL) {
1023 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1024 Collections->sort(commitLocalSequenceNumbers);
1026 uint clsSize = commitLocalSequenceNumbers->size();
1027 for(uint clsi = 0; clsi < clsSize; clsi++) {
1028 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1029 Commit *commit = commitForClientTable->get(localSequenceNumber);
1031 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1035 unseenArbitrations->addAll(commit->getParts()->values());
1037 for (CommitPart *commitPart : commit->getParts()->values()) {
1038 returnDataSize += commitPart->getSize();
1043 // Number of arbitration entries to decode
1044 returnDataSize += 2 * sizeof(int32_t);
1046 // bool of did commit or not
1047 if (numberOfParts != 0) {
1048 returnDataSize += sizeof(char);
1051 // Data to send Back
1052 Array<char> *returnData = new Array<char>(returnDataSize);
1053 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1055 if (numberOfParts != 0) {
1057 bbEncode->put((char)1);
1059 bbEncode->put((char)0);
1061 if (couldArbitrate) {
1062 bbEncode->put((char)1);
1064 bbEncode->put((char)0);
1068 bbEncode->putInt(unseenArbitrations->size());
1069 uint size = unseenArbitrations->size();
1070 for (uint i = 0; i < size; i++) {
1071 Entry *entry = unseenArbitrations->get(i);
1072 entry->encode(bbEncode);
1075 localSequenceNumber++;
1079 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1080 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1081 attemptedToSendToServer = true;
1083 bool inserted = false;
1084 bool lastTryInserted = false;
1086 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1087 if (array == NULL) {
1088 array = new Array<Slot *>();
1089 array->set(0, slot);
1090 rejectedSlotVector->clear();
1093 if (array->length() == 0) {
1094 throw new Error("Server Error: Did not send any slots");
1097 // if (attemptedToSendToServerTmp) {
1098 if (hadPartialSendToServer) {
1100 bool isInserted = false;
1101 uint size = array->length();
1102 for (uint i = 0; i < size; i++) {
1103 Slot *s = array->get(i);
1104 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1110 for (uint i = 0; i < size; i++) {
1111 Slot *s = array->get(i);
1116 // Process each entry in the slot
1117 Vector<Entry *> *entries = s->getEntries();
1118 uint eSize = entries->size();
1119 for(uint ei=0; ei < eSize; ei++) {
1120 Entry * entry = entries->get(ei);
1122 if (entry->getType() == TypeLastMessage) {
1123 LastMessage *lastMessage = (LastMessage *)entry;
1125 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1134 rejectedSlotVector->add(slot->getSequenceNumber());
1135 lastTryInserted = false;
1137 lastTryInserted = true;
1140 rejectedSlotVector->add(slot->getSequenceNumber());
1141 lastTryInserted = false;
1145 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1149 * Returns false if a resize was needed
1151 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1153 if (liveSlotCount > bufferResizeThreshold) {
1154 resize = true;//Resize is forced
1158 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1159 TableStatus *status = new TableStatus(slot, newSize);
1160 slot->addEntry(status);
1163 // Fill with rejected slots first before doing anything else
1164 doRejectedMessages(slot);
1166 // Do mandatory rescue of entries
1167 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1169 // Extract working variables
1170 bool needsResize = mandatoryRescueReturn.getFirst();
1171 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1172 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1174 if (needsResize && !resize) {
1175 // We need to resize but we are not resizing so return false
1176 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1179 bool inserted = false;
1180 if (newKeyEntry != NULL) {
1181 newKeyEntry->setSlot(slot);
1182 if (slot->hasSpace(newKeyEntry)) {
1183 slot->addEntry(newKeyEntry);
1188 // Clear the transactions, aborts and commits that were sent previously
1189 transactionPartsSent->clear();
1190 pendingSendArbitrationEntriesToDelete->clear();
1191 uint size = pendingSendArbitrationRounds->size();
1192 for (uint i = 0; i < size; i++) {
1193 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1194 bool isFull = false;
1195 round->generateParts();
1196 Vector<Entry *> *parts = round->getParts();
1198 // Insert pending arbitration data
1199 uint vsize = parts->size();
1200 for (uint vi = 0; vi < vsize; vi++) {
1201 Entry *arbitrationData = parts->get(vi);
1203 // If it is an abort then we need to set some information
1204 if (arbitrationData->getType() == TypeAbort) {
1205 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1208 if (!slot->hasSpace(arbitrationData)) {
1209 // No space so cant do anything else with these data entries
1214 // Add to this current slot and add it to entries to delete
1215 slot->addEntry(arbitrationData);
1216 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1224 if (pendingTransactionQueue->size() > 0) {
1225 Transaction *transaction = pendingTransactionQueue->get(0);
1226 // Set the transaction sequence number if it has yet to be inserted into the block chain
1227 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1228 transaction->setSequenceNumber(slot->getSequenceNumber());
1232 TransactionPart *part = transaction->getNextPartToSend();
1234 // Ran out of parts to send for this transaction so move on
1238 if (slot->hasSpace(part)) {
1239 slot->addEntry(part);
1240 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1241 if (partsSent == NULL) {
1242 partsSent = new Vector<int32_t>();
1243 transactionPartsSent->put(transaction, partsSent);
1245 partsSent->add(part->getPartNumber());
1246 transactionPartsSent->put(transaction, partsSent);
1253 // Fill the remainder of the slot with rescue data
1254 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1256 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1259 void Table::doRejectedMessages(Slot *s) {
1260 if (!rejectedSlotVector->isEmpty()) {
1261 /* TODO: We should avoid generating a rejected message entry if
1262 * there is already a sufficient entry in the queue (e->g->,
1263 * equalsto value of true and same sequence number)-> */
1265 int64_t old_seqn = rejectedSlotVector->firstElement();
1266 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1267 int64_t new_seqn = rejectedSlotVector->lastElement();
1268 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1271 int64_t prev_seqn = -1;
1273 /* Go through list of missing messages */
1274 for (; i < rejectedSlotVector->size(); i++) {
1275 int64_t curr_seqn = rejectedSlotVector->get(i);
1276 Slot *s_msg = buffer->getSlot(curr_seqn);
1279 prev_seqn = curr_seqn;
1281 /* Generate rejected message entry for missing messages */
1282 if (prev_seqn != -1) {
1283 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1286 /* Generate rejected message entries for present messages */
1287 for (; i < rejectedSlotVector->size(); i++) {
1288 int64_t curr_seqn = rejectedSlotVector->get(i);
1289 Slot *s_msg = buffer->getSlot(curr_seqn);
1290 int64_t machineid = s_msg->getMachineID();
1291 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1298 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1299 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1300 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1301 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1302 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1305 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1306 bool seenLiveSlot = false;
1307 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1308 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1312 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1313 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1314 // Push slot number forward
1315 if (!seenLiveSlot) {
1316 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1319 if (!previousSlot->isLive()) {
1323 // We have seen a live slot
1324 seenLiveSlot = true;
1326 // Get all the live entries for a slot
1327 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1329 // Iterate over all the live entries and try to rescue them
1330 for (Entry *liveEntry : liveEntries) {
1331 if (slot->hasSpace(liveEntry)) {
1332 // Enough space to rescue the entry
1333 slot->addEntry(liveEntry);
1334 } else if (currentSequenceNumber == firstIfFull) {
1335 //if there's no space but the entry is about to fall off the queue
1336 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1342 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1345 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1346 /* now go through live entries from least to greatest sequence number until
1347 * either all live slots added, or the slot doesn't have enough room
1348 * for SKIP_THRESHOLD consecutive entries*/
1350 int64_t newestseqnum = buffer->getNewestSeqNum();
1352 for (; seqn <= newestseqnum; seqn++) {
1353 Slot *prevslot = buffer->getSlot(seqn);
1354 //Push slot number forward
1356 oldestLiveSlotSequenceNumver = seqn;
1358 if (!prevslot->isLive())
1360 seenliveslot = true;
1361 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1362 for (Entry *liveentry : liveentries) {
1363 if (s->hasSpace(liveentry))
1364 s->addEntry(liveentry);
1367 if (skipcount > Table_SKIP_THRESHOLD)
1377 * Checks for malicious activity and updates the local copy of the block chain->
1379 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1380 // The cloud communication layer has checked slot HMACs already
1382 if (newSlots->length() == 0) {
1386 // Make sure all slots are newer than the last largest slot this
1388 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1389 if (firstSeqNum <= sequenceNumber) {
1390 throw new Error("Server Error: Sent older slots!");
1393 // Create an object that can access both new slots and slots in our
1394 // local chain without committing slots to our local chain
1395 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1397 // Check that the HMAC chain is not broken
1398 checkHMACChain(indexer, newSlots);
1400 // Set to keep track of messages from clients
1401 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1403 // Process each slots data
1404 for (Slot *slot : newSlots) {
1405 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1407 updateExpectedSize();
1410 // If there is a gap, check to see if the server sent us
1412 if (firstSeqNum != (sequenceNumber + 1)) {
1414 // Check the size of the slots that were sent down by the server->
1415 // Can only check the size if there was a gap
1416 checkNumSlots(newSlots->length);
1418 // Since there was a gap every machine must have pushed a slot or
1419 // must have a last message message-> If not then the server is
1421 if (!machineSet->isEmpty()) {
1422 throw new Error("Missing record for machines: ");
1426 // Update the size of our local block chain->
1429 // Commit new to slots to the local block chain->
1430 for (Slot *slot : newSlots) {
1432 // Insert this slot into our local block chain copy->
1433 buffer->putSlot(slot);
1435 // Keep track of how many slots are currently live (have live data
1440 // Get the sequence number of the latest slot in the system
1441 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1442 updateLiveStateFromServer();
1444 // No Need to remember after we pulled from the server
1445 offlineTransactionsCommittedAndAtServer->clear();
1447 // This is invalidated now
1448 hadPartialSendToServer = false;
1451 void Table::updateLiveStateFromServer() {
1452 // Process the new transaction parts
1453 processNewTransactionParts();
1455 // Do arbitration on new transactions that were received
1456 arbitrateFromServer();
1458 // Update all the committed keys
1459 bool didCommitOrSpeculate = updateCommittedTable();
1461 // Delete the transactions that are now dead
1462 updateLiveTransactionsAndStatus();
1465 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1466 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1469 void Table::updateLiveStateFromLocal() {
1470 // Update all the committed keys
1471 bool didCommitOrSpeculate = updateCommittedTable();
1473 // Delete the transactions that are now dead
1474 updateLiveTransactionsAndStatus();
1477 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1478 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1481 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1482 int64_t prevslots = firstSequenceNumber;
1484 if (didFindTableStatus) {
1486 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1489 didFindTableStatus = true;
1490 currMaxSize = numberOfSlots;
1493 void Table::updateExpectedSize() {
1496 if (expectedsize > currMaxSize) {
1497 expectedsize = currMaxSize;
1503 * Check the size of the block chain to make sure there are enough
1504 * slots sent back by the server-> This is only called when we have a
1505 * gap between the slots that we have locally and the slots sent by
1506 * the server therefore in the slots sent by the server there will be
1507 * at least 1 Table status message
1509 void Table::checkNumSlots(int numberOfSlots) {
1510 if (numberOfSlots != expectedsize) {
1511 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1515 void Table::updateCurrMaxSize(int newmaxsize) {
1516 currMaxSize = newmaxsize;
1521 * Update the size of of the local buffer if it is needed->
1523 void Table::commitNewMaxSize() {
1524 didFindTableStatus = false;
1526 // Resize the local slot buffer
1527 if (numberOfSlots != currMaxSize) {
1528 buffer->resize((int32_t)currMaxSize);
1531 // Change the number of local slots to the new size
1532 numberOfSlots = (int32_t)currMaxSize;
1534 // Recalculate the resize threshold since the size of the local
1535 // buffer has changed
1536 setResizeThreshold();
1540 * Process the new transaction parts from this latest round of slots
1541 * received from the server
1543 void Table::processNewTransactionParts() {
1545 if (newTransactionParts->size() == 0) {
1546 // Nothing new to process
1550 // Iterate through all the machine Ids that we received new parts
1552 for (int64_t machineId : newTransactionParts->keySet()) {
1553 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1555 // Iterate through all the parts for that machine Id
1556 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1557 TransactionPart *part = parts->get(partId);
1559 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1560 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1561 // Set dead the transaction part
1566 // Get the transaction object for that sequence number
1567 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1569 if (transaction == NULL) {
1570 // This is a new transaction that we dont have so make a new one
1571 transaction = new Transaction();
1573 // Insert this new transaction into the live tables
1574 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1575 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1578 // Add that part to the transaction
1579 transaction->addPartDecode(part);
1583 // Clear all the new transaction parts in preparation for the next
1584 // time the server sends slots
1585 newTransactionParts->clear();
1588 void Table::arbitrateFromServer() {
1590 if (liveTransactionBySequenceNumberTable->size() == 0) {
1591 // Nothing to arbitrate on so move on
1595 // Get the transaction sequence numbers and sort from oldest to newest
1596 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1597 Collections->sort(transactionSequenceNumbers);
1599 // Collection of key value pairs that are
1600 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1602 // The last transaction arbitrated on
1603 int64_t lastTransactionCommitted = -1;
1604 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1606 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1607 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1611 // Check if this machine arbitrates for this transaction if not
1612 // then we cant arbitrate this transaction
1613 if (transaction->getArbitrator() != localMachineId) {
1617 if (transactionSequenceNumber < lastSeqNumArbOn) {
1621 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1622 // We have seen this already locally so dont commit again
1627 if (!transaction->isComplete()) {
1628 // Will arbitrate in incorrect order if we continue so just break
1634 // update the largest transaction seen by arbitrator from server
1635 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1636 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1638 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1639 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1640 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1644 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1645 // Guard evaluated as true
1647 // Update the local changes so we can make the commit
1648 SetIterator<KeyValue *> *kvit = getKeyIterator(transaction->getKeyValueUpdateSet());
1649 while (kvit->hasNext()) {
1650 KeyValue *kv = kvit->next();
1651 speculativeTableTmp->put(kv->getKey(), kv);
1655 // Update what the last transaction committed was for use in batch commit
1656 lastTransactionCommitted = transactionSequenceNumber;
1658 // Guard evaluated was false so create abort
1660 Abort *newAbort = new Abort(NULL,
1661 transaction->getClientLocalSequenceNumber(),
1662 transaction->getSequenceNumber(),
1663 transaction->getMachineId(),
1664 transaction->getArbitrator(),
1665 localArbitrationSequenceNumber);
1666 localArbitrationSequenceNumber++;
1667 generatedAborts->add(newAbort);
1669 // Insert the abort so we can process
1670 processEntry(newAbort);
1673 lastSeqNumArbOn = transactionSequenceNumber;
1676 Commit *newCommit = NULL;
1678 // If there is something to commit
1679 if (speculativeTableTmp->size() != 0) {
1680 // Create the commit and increment the commit sequence number
1681 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1682 localArbitrationSequenceNumber++;
1684 // Add all the new keys to the commit
1685 for (KeyValue *kv : speculativeTableTmp->values()) {
1686 newCommit->addKV(kv);
1689 // create the commit parts
1690 newCommit->createCommitParts();
1692 // Append all the commit parts to the end of the pending queue
1693 // waiting for sending to the server
1694 // Insert the commit so we can process it
1695 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1696 processEntry(commitPart);
1700 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1701 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1702 pendingSendArbitrationRounds->add(arbitrationRound);
1704 if (compactArbitrationData()) {
1705 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1706 if (newArbitrationRound->getCommit() != NULL) {
1707 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1708 processEntry(commitPart);
1715 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1717 // Check if this machine arbitrates for this transaction if not then
1718 // we cant arbitrate this transaction
1719 if (transaction->getArbitrator() != localMachineId) {
1720 return Pair<bool, bool>(false, false);
1723 if (!transaction->isComplete()) {
1724 // Will arbitrate in incorrect order if we continue so just break
1726 return Pair<bool, bool>(false, false);
1729 if (transaction->getMachineId() != localMachineId) {
1730 // dont do this check for local transactions
1731 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1732 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1733 // We've have already seen this from the server
1734 return Pair<bool, bool>(false, false);
1739 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1740 // Guard evaluated as true Create the commit and increment the
1741 // commit sequence number
1742 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1743 localArbitrationSequenceNumber++;
1745 // Update the local changes so we can make the commit
1746 SetIterator<KeyValue *> *kvit = getKeyIterator(transaction->getKeyValueUpdateSet());
1747 while (kvit->hasNext()) {
1748 KeyValue *kv = kvit->next();
1749 newCommit->addKV(kv);
1753 // create the commit parts
1754 newCommit->createCommitParts();
1756 // Append all the commit parts to the end of the pending queue
1757 // waiting for sending to the server
1758 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1759 pendingSendArbitrationRounds->add(arbitrationRound);
1761 if (compactArbitrationData()) {
1762 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1763 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1764 processEntry(commitPart);
1767 // Insert the commit so we can process it
1768 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1769 processEntry(commitPart);
1773 if (transaction->getMachineId() == localMachineId) {
1774 TransactionStatus *status = transaction->getTransactionStatus();
1775 if (status != NULL) {
1776 status->setStatus(TransactionStatus_StatusCommitted);
1780 updateLiveStateFromLocal();
1781 return Pair<bool, bool>(true, true);
1783 if (transaction->getMachineId() == localMachineId) {
1784 // For locally created messages update the status
1785 // Guard evaluated was false so create abort
1786 TransactionStatus * status = transaction->getTransactionStatus();
1787 if (status != NULL) {
1788 status->setStatus(TransactionStatus_StatusAborted);
1791 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1794 Abort *newAbort = new Abort(NULL,
1795 transaction->getClientLocalSequenceNumber(),
1797 transaction->getMachineId(),
1798 transaction->getArbitrator(),
1799 localArbitrationSequenceNumber);
1800 localArbitrationSequenceNumber++;
1801 addAbortSet->add(newAbort);
1803 // Append all the commit parts to the end of the pending queue
1804 // waiting for sending to the server
1805 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1806 pendingSendArbitrationRounds->add(arbitrationRound);
1808 if (compactArbitrationData()) {
1809 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1810 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1811 processEntry(commitPart);
1816 updateLiveStateFromLocal();
1817 return Pair<bool, bool>(true, false);
1822 * Compacts the arbitration data my merging commits and aggregating
1823 * aborts so that a single large push of commits can be done instead
1824 * of many small updates
1826 bool Table::compactArbitrationData() {
1827 if (pendingSendArbitrationRounds->size() < 2) {
1828 // Nothing to compact so do nothing
1832 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1833 if (lastRound->getDidSendPart()) {
1837 bool hadCommit = (lastRound->getCommit() == NULL);
1838 bool gotNewCommit = false;
1840 int numberToDelete = 1;
1841 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1842 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1844 if (round->isFull() || round->getDidSendPart()) {
1845 // Stop since there is a part that cannot be compacted and we
1846 // need to compact in order
1850 if (round->getCommit() == NULL) {
1851 // Try compacting aborts only
1852 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1853 if (newSize > ArbitrationRound->MAX_PARTS) {
1854 // Cant compact since it would be too large
1857 lastRound->addAborts(round->getAborts());
1859 // Create a new larger commit
1860 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1861 localArbitrationSequenceNumber++;
1863 // Create the commit parts so that we can count them
1864 newCommit->createCommitParts();
1866 // Calculate the new size of the parts
1867 int newSize = newCommit->getNumberOfParts();
1868 newSize += lastRound->getAbortsCount();
1869 newSize += round->getAbortsCount();
1871 if (newSize > ArbitrationRound->MAX_PARTS) {
1872 // Cant compact since it would be too large
1876 // Set the new compacted part
1877 lastRound->setCommit(newCommit);
1878 lastRound->addAborts(round->getAborts());
1879 gotNewCommit = true;
1885 if (numberToDelete != 1) {
1886 // If there is a compaction
1887 // Delete the previous pieces that are now in the new compacted piece
1888 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1889 pendingSendArbitrationRounds->clear();
1891 for (int i = 0; i < numberToDelete; i++) {
1892 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1896 // Add the new compacted into the pending to send list
1897 pendingSendArbitrationRounds->add(lastRound);
1899 // Should reinsert into the commit processor
1900 if (hadCommit && gotNewCommit) {
1909 * Update all the commits and the committed tables, sets dead the dead
1912 bool Table::updateCommittedTable() {
1914 if (newCommitParts->size() == 0) {
1915 // Nothing new to process
1919 // Iterate through all the machine Ids that we received new parts for
1920 for (int64_t machineId : newCommitParts->keySet()) {
1921 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1923 // Iterate through all the parts for that machine Id
1924 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1925 CommitPart *part = parts->get(partId);
1927 // Get the transaction object for that sequence number
1928 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1930 if (commitForClientTable == NULL) {
1931 // This is the first commit from this device
1932 commitForClientTable = new Hashtable<int64_t, Commit *>();
1933 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1936 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1938 if (commit == NULL) {
1939 // This is a new commit that we dont have so make a new one
1940 commit = new Commit();
1942 // Insert this new commit into the live tables
1943 commitForClientTable->put(part->getSequenceNumber(), commit);
1946 // Add that part to the commit
1947 commit->addPartDecode(part);
1951 // Clear all the new commits parts in preparation for the next time
1952 // the server sends slots
1953 newCommitParts->clear();
1955 // If we process a new commit keep track of it for future use
1956 bool didProcessANewCommit = false;
1958 // Process the commits one by one
1959 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1961 // Get all the commits for a specific arbitrator
1962 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1964 // Sort the commits in order
1965 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1966 Collections->sort(commitSequenceNumbers);
1968 // Get the last commit seen from this arbitrator
1969 int64_t lastCommitSeenSequenceNumber = -1;
1970 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1971 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1974 // Go through each new commit one by one
1975 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1976 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1977 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1979 // Special processing if a commit is not complete
1980 if (!commit->isComplete()) {
1981 if (i == (commitSequenceNumbers->size() - 1)) {
1982 // If there is an incomplete commit and this commit is the
1983 // latest one seen then this commit cannot be processed and
1984 // there are no other commits
1987 // This is a commit that was already dead but parts of it
1988 // are still in the block chain (not flushed out yet)->
1989 // Delete it and move on
1991 commitForClientTable->remove(commit->getSequenceNumber());
1996 // Update the last transaction that was updated if we can
1997 if (commit->getTransactionSequenceNumber() != -1) {
1998 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2000 // Update the last transaction sequence number that the arbitrator arbitrated on
2001 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2002 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2006 // Update the last arbitration data that we have seen so far
2007 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2008 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2009 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2011 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2014 // Never seen any data from this arbitrator so record the first one
2015 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2018 // We have already seen this commit before so need to do the
2019 // full processing on this commit
2020 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2022 // Update the last transaction that was updated if we can
2023 if (commit->getTransactionSequenceNumber() != -1) {
2024 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2026 // Update the last transaction sequence number that the arbitrator arbitrated on
2027 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2028 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2035 // If we got here then this is a brand new commit and needs full
2037 // Get what commits should be edited, these are the commits that
2038 // have live values for their keys
2039 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2040 SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2041 while (kvit->hasNext()) {
2042 KeyValue *kv = kvit->next();
2043 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2046 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2048 // Update each previous commit that needs to be updated
2049 for (Commit *previousCommit : commitsToEdit) {
2051 // Only bother with live commits (TODO: Maybe remove this check)
2052 if (previousCommit->isLive()) {
2054 // Update which keys in the old commits are still live
2055 SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2056 while (kvit->hasNext()) {
2057 KeyValue *kv = kvit->next();
2058 previousCommit->invalidateKey(kv->getKey());
2062 // if the commit is now dead then remove it
2063 if (!previousCommit->isLive()) {
2064 commitForClientTable->remove(previousCommit);
2069 // Update the last seen sequence number from this arbitrator
2070 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2071 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2072 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2075 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2078 // We processed a new commit that we havent seen before
2079 didProcessANewCommit = true;
2081 // Update the committed table of keys and which commit is using which key
2082 SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2083 while (kvit->hasNext()) {
2084 KeyValue *kv = kvit->next();
2085 committedKeyValueTable->put(kv->getKey(), kv);
2086 liveCommitsByKeyTable->put(kv->getKey(), commit);
2092 return didProcessANewCommit;
2096 * Create the speculative table from transactions that are still live
2097 * and have come from the cloud
2099 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2100 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2101 // There is nothing to speculate on
2105 // Create a list of the transaction sequence numbers and sort them
2106 // from oldest to newest
2107 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2108 Collections->sort(transactionSequenceNumbersSorted);
2110 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2113 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2114 // If there is a gap in the transaction sequence numbers then
2115 // there was a commit or an abort of a transaction OR there was a
2116 // new commit (Could be from offline commit) so a redo the
2117 // speculation from scratch
2119 // Start from scratch
2120 speculatedKeyValueTable->clear();
2121 lastTransactionSequenceNumberSpeculatedOn = -1;
2122 oldestTransactionSequenceNumberSpeculatedOn = -1;
2126 // Remember the front of the transaction list
2127 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2129 // Find where to start arbitration from
2130 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2132 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2133 // Make sure we are not out of bounds
2134 return false; // did not speculate
2137 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2138 bool didSkip = true;
2140 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2141 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2142 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2144 if (!transaction->isComplete()) {
2145 // If there is an incomplete transaction then there is nothing
2146 // we can do add this transactions arbitrator to the list of
2147 // arbitrators we should ignore
2148 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2153 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2157 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2159 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2160 // Guard evaluated to true so update the speculative table
2161 SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2162 while (kvit->hasNext()) {
2163 KeyValue *kv = kvit->next();
2164 speculatedKeyValueTable->put(kv->getKey(), kv);
2171 // Since there was a skip we need to redo the speculation next time around
2172 lastTransactionSequenceNumberSpeculatedOn = -1;
2173 oldestTransactionSequenceNumberSpeculatedOn = -1;
2176 // We did some speculation
2181 * Create the pending transaction speculative table from transactions
2182 * that are still in the pending transaction buffer
2184 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2185 if (pendingTransactionQueue->size() == 0) {
2186 // There is nothing to speculate on
2190 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2191 // need to reset on the pending speculation
2192 lastPendingTransactionSpeculatedOn = NULL;
2193 firstPendingTransaction = pendingTransactionQueue->get(0);
2194 pendingTransactionSpeculatedKeyValueTable->clear();
2197 // Find where to start arbitration from
2198 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2200 if (startIndex >= pendingTransactionQueue->size()) {
2201 // Make sure we are not out of bounds
2205 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2206 Transaction *transaction = pendingTransactionQueue->get(i);
2208 lastPendingTransactionSpeculatedOn = transaction;
2210 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2211 // Guard evaluated to true so update the speculative table
2212 SetIterator<KeyValue *> *kvit = getKeyIterator(commit->getKeyValueUpdateSet());
2213 while (kvit->hasNext()) {
2214 KeyValue *kv = kvit->next();
2215 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2223 * Set dead and remove from the live transaction tables the
2224 * transactions that are dead
2226 void Table::updateLiveTransactionsAndStatus() {
2228 // Go through each of the transactions
2229 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2230 Transaction *transaction = iter->next()->getValue();
2232 // Check if the transaction is dead
2233 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2234 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2236 // Set dead the transaction
2237 transaction->setDead();
2239 // Remove the transaction from the live table
2241 liveTransactionByTransactionIdTable->remove(transaction->getId());
2245 // Go through each of the transactions
2246 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2247 TransactionStatus *status = iter->next()->getValue();
2249 // Check if the transaction is dead
2250 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2251 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2254 status->setStatus(TransactionStatus_StatusCommitted);
2263 * Process this slot, entry by entry-> Also update the latest message sent by slot
2265 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2267 // Update the last message seen
2268 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2270 // Process each entry in the slot
2271 Vector<Entry *> *entries = slot->getEntries();
2272 uint eSize = entries->size();
2273 for(uint ei=0; ei < eSize; ei++) {
2274 Entry * entry = entries->get(ei);
2275 switch (entry->getType()) {
2276 case TypeCommitPart:
2277 processEntry((CommitPart *)entry);
2280 processEntry((Abort *)entry);
2282 case TypeTransactionPart:
2283 processEntry((TransactionPart *)entry);
2286 processEntry((NewKey *)entry);
2288 case TypeLastMessage:
2289 processEntry((LastMessage *)entry, machineSet);
2291 case TypeRejectedMessage:
2292 processEntry((RejectedMessage *)entry, indexer);
2294 case TypeTableStatus:
2295 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2298 throw new Error("Unrecognized type: ");
2304 * Update the last message that was sent for a machine Id
2306 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2307 // Update what the last message received by a machine was
2308 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2312 * Add the new key to the arbitrators table and update the set of live
2313 * new keys (in case of a rescued new key message)
2315 void Table::processEntry(NewKey *entry) {
2316 // Update the arbitrator table with the new key information
2317 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2319 // Update what the latest live new key is
2320 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2321 if (oldNewKey != NULL) {
2322 // Delete the old new key messages
2323 oldNewKey->setDead();
2328 * Process new table status entries and set dead the old ones as new
2329 * ones come in-> keeps track of the largest and smallest table status
2330 * seen in this current round of updating the local copy of the block
2333 void Table::processEntry(TableStatus entry, int64_t seq) {
2334 int newNumSlots = entry->getMaxSlots();
2335 updateCurrMaxSize(newNumSlots);
2336 initExpectedSize(seq, newNumSlots);
2338 if (liveTableStatus != NULL) {
2339 // We have a larger table status so the old table status is no
2341 liveTableStatus->setDead();
2344 // Make this new table status the latest alive table status
2345 liveTableStatus = entry;
2349 * Check old messages to see if there is a block chain violation->
2352 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2353 int64_t oldSeqNum = entry->getOldSeqNum();
2354 int64_t newSeqNum = entry->getNewSeqNum();
2355 bool isequal = entry->getEqual();
2356 int64_t machineId = entry->getMachineID();
2357 int64_t seq = entry->getSequenceNumber();
2359 // Check if we have messages that were supposed to be rejected in
2360 // our local block chain
2361 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2363 Slot *slot = indexer->getSlot(seqNum);
2366 // If we have this slot make sure that it was not supposed to be
2368 int64_t slotMachineId = slot->getMachineID();
2369 if (isequal != (slotMachineId == machineId)) {
2370 throw new Error("Server Error: Trying to insert rejected message for slot ");
2375 // Create a list of clients to watch until they see this rejected
2377 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2378 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2379 // Machine ID for the last message entry
2380 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2382 // We've seen it, don't need to continue to watch-> Our next
2383 // message will implicitly acknowledge it->
2384 if (lastMessageEntryMachineId == localMachineId) {
2388 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2389 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2391 if (entrySequenceNumber < seq) {
2392 // Add this rejected message to the set of messages that this
2393 // machine ID did not see yet
2394 addWatchVector(lastMessageEntryMachineId, entry);
2395 // This client did not see this rejected message yet so add it
2396 // to the watch set to monitor
2397 deviceWatchSet->add(lastMessageEntryMachineId);
2400 if (deviceWatchSet->isEmpty()) {
2401 // This rejected message has been seen by all the clients so
2404 // We need to watch this rejected message
2405 entry->setWatchSet(deviceWatchSet);
2410 * Check if this abort is live, if not then save it so we can kill it
2411 * later-> update the last transaction number that was arbitrated on->
2413 void Table::processEntry(Abort *entry) {
2414 if (entry->getTransactionSequenceNumber() != -1) {
2415 // update the transaction status if it was sent to the server
2416 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2417 if (status != NULL) {
2418 status->setStatus(TransactionStatus_StatusAborted);
2422 // Abort has not been seen by the client it is for yet so we need to
2424 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2425 if (previouslySeenAbort != NULL) {
2426 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2429 if (entry->getTransactionArbitrator() == localMachineId) {
2430 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2433 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2434 // The machine already saw this so it is dead
2436 liveAbortTable->remove(entry->getAbortId());
2438 if (entry->getTransactionArbitrator() == localMachineId) {
2439 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2444 // Update the last arbitration data that we have seen so far
2445 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2446 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2447 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2449 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2452 // Never seen any data from this arbitrator so record the first one
2453 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2456 // Set dead a transaction if we can
2457 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2458 if (transactionToSetDead != NULL) {
2459 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2462 // Update the last transaction sequence number that the arbitrator
2464 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2465 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2467 if (entry->getTransactionSequenceNumber() != -1) {
2468 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2474 * Set dead the transaction part if that transaction is dead and keep
2475 * track of all new parts
2477 void Table::processEntry(TransactionPart *entry) {
2478 // Check if we have already seen this transaction and set it dead OR
2479 // if it is not alive
2480 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2481 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2482 // This transaction is dead, it was already committed or aborted
2487 // This part is still alive
2488 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2490 if (transactionPart == NULL) {
2491 // Dont have a table for this machine Id yet so make one
2492 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2493 newTransactionParts->put(entry->getMachineId(), transactionPart);
2496 // Update the part and set dead ones we have already seen (got a
2498 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2499 if (previouslySeenPart != NULL) {
2500 previouslySeenPart->setDead();
2505 * Process new commit entries and save them for future use-> Delete duplicates
2507 void Table::processEntry(CommitPart *entry) {
2508 // Update the last transaction that was updated if we can
2509 if (entry->getTransactionSequenceNumber() != -1) {
2510 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2511 // Update the last transaction sequence number that the arbitrator
2513 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2514 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2518 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2519 if (commitPart == NULL) {
2520 // Don't have a table for this machine Id yet so make one
2521 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2522 newCommitParts->put(entry->getMachineId(), commitPart);
2524 // Update the part and set dead ones we have already seen (got a
2526 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2527 if (previouslySeenPart != NULL) {
2528 previouslySeenPart->setDead();
2533 * Update the last message seen table-> Update and set dead the
2534 * appropriate RejectedMessages as clients see them-> Updates the live
2535 * aborts, removes those that are dead and sets them dead-> Check that
2536 * the last message seen is correct and that there is no mismatch of
2537 * our own last message or that other clients have not had a rollback
2538 * on the last message->
2540 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2541 // We have seen this machine ID
2542 machineSet->remove(machineId);
2544 // Get the set of rejected messages that this machine Id is has not seen yet
2545 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2546 // If there is a rejected message that this machine Id has not seen yet
2547 if (watchset != NULL) {
2548 // Go through each rejected message that this machine Id has not
2550 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2551 RejectedMessage *rm = rmit->next();
2552 // If this machine Id has seen this rejected message->->->
2553 if (rm->getSequenceNumber() <= seqNum) {
2554 // Remove it from our watchlist
2556 // Decrement machines that need to see this notification
2557 rm->removeWatcher(machineId);
2562 // Set dead the abort
2563 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2564 Abort *abort = i->next()->getValue();
2565 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2568 if (abort->getTransactionArbitrator() == localMachineId) {
2569 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2573 if (machineId == localMachineId) {
2574 // Our own messages are immediately dead->
2575 if (liveness instanceof LastMessage) {
2576 ((LastMessage *)liveness)->setDead();
2577 } else if (liveness instanceof Slot) {
2578 ((Slot *)liveness)->setDead();
2580 throw new Error("Unrecognized type");
2583 // Get the old last message for this device
2584 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2585 if (lastMessageEntry == NULL) {
2586 // If no last message then there is nothing else to process
2590 int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2591 Liveness *lastEntry = lastMessageEntry.getSecond();
2593 // If it is not our machine Id since we already set ours to dead
2594 if (machineId != localMachineId) {
2595 if (lastEntry instanceof LastMessage) {
2596 ((LastMessage *)lastEntry)->setDead();
2597 } else if (lastEntry instanceof Slot) {
2598 ((Slot *)lastEntry)->setDead();
2600 throw new Error("Unrecognized type");
2603 // Make sure the server is not playing any games
2604 if (machineId == localMachineId) {
2605 if (hadPartialSendToServer) {
2606 // We were not making any updates and we had a machine mismatch
2607 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2608 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2611 // We were not making any updates and we had a machine mismatch
2612 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2613 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2617 if (lastMessageSeqNum > seqNum) {
2618 throw new Error("Server Error: Rollback on remote machine sequence number");
2624 * Add a rejected message entry to the watch set to keep track of
2625 * which clients have seen that rejected message entry and which have
2628 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2629 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2630 if (entries == NULL) {
2631 // There is no set for this machine ID yet so create one
2632 entries = new Hashset<RejectedMessage *>();
2633 rejectedMessageWatchVectorTable->put(machineId, entries);
2635 entries->add(entry);
2639 * Check if the HMAC chain is not violated
2641 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2642 for (int i = 0; i < newSlots->length(); i++) {
2643 Slot *currSlot = newSlots->get(i);
2644 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2645 if (prevSlot != NULL &&
2646 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2647 throw new Error("Server Error: Invalid HMAC Chain");