3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
23 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
25 cloud(new CloudComm(this, baseurl, password, listeningPort)),
27 liveTableStatus(NULL),
28 pendingTransactionBuilder(NULL),
29 lastPendingTransactionSpeculatedOn(NULL),
30 firstPendingTransaction(NULL),
32 bufferResizeThreshold(0),
34 oldestLiveSlotSequenceNumver(1),
35 localMachineId(_localMachineId),
37 localTransactionSequenceNumber(0),
38 lastTransactionSequenceNumberSpeculatedOn(0),
39 oldestTransactionSequenceNumberSpeculatedOn(0),
40 localArbitrationSequenceNumber(0),
41 hadPartialSendToServer(false),
42 attemptedToSendToServer(false),
44 didFindTableStatus(false),
46 lastSlotAttemptedToSend(NULL),
49 lastTransactionPartsSent(NULL),
50 lastPendingSendArbitrationEntriesToDelete(NULL),
52 committedKeyValueTable(NULL),
53 speculatedKeyValueTable(NULL),
54 pendingTransactionSpeculatedKeyValueTable(NULL),
55 liveNewKeyTable(NULL),
56 lastMessageTable(NULL),
57 rejectedMessageWatchVectorTable(NULL),
58 arbitratorTable(NULL),
60 newTransactionParts(NULL),
62 lastArbitratedTransactionNumberByArbitratorTable(NULL),
63 liveTransactionBySequenceNumberTable(NULL),
64 liveTransactionByTransactionIdTable(NULL),
65 liveCommitsTable(NULL),
66 liveCommitsByKeyTable(NULL),
67 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
68 rejectedSlotVector(NULL),
69 pendingTransactionQueue(NULL),
70 pendingSendArbitrationRounds(NULL),
71 pendingSendArbitrationEntriesToDelete(NULL),
72 transactionPartsSent(NULL),
73 outstandingTransactionStatus(NULL),
74 liveAbortsGeneratedByLocal(NULL),
75 offlineTransactionsCommittedAndAtServer(NULL),
76 localCommunicationTable(NULL),
77 lastTransactionSeenFromMachineFromServer(NULL),
78 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
79 lastInsertedNewKey(false),
85 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
89 liveTableStatus(NULL),
90 pendingTransactionBuilder(NULL),
91 lastPendingTransactionSpeculatedOn(NULL),
92 firstPendingTransaction(NULL),
94 bufferResizeThreshold(0),
96 oldestLiveSlotSequenceNumver(1),
97 localMachineId(_localMachineId),
99 localTransactionSequenceNumber(0),
100 lastTransactionSequenceNumberSpeculatedOn(0),
101 oldestTransactionSequenceNumberSpeculatedOn(0),
102 localArbitrationSequenceNumber(0),
103 hadPartialSendToServer(false),
104 attemptedToSendToServer(false),
106 didFindTableStatus(false),
108 lastSlotAttemptedToSend(NULL),
111 lastTransactionPartsSent(NULL),
112 lastPendingSendArbitrationEntriesToDelete(NULL),
114 committedKeyValueTable(NULL),
115 speculatedKeyValueTable(NULL),
116 pendingTransactionSpeculatedKeyValueTable(NULL),
117 liveNewKeyTable(NULL),
118 lastMessageTable(NULL),
119 rejectedMessageWatchVectorTable(NULL),
120 arbitratorTable(NULL),
121 liveAbortTable(NULL),
122 newTransactionParts(NULL),
123 newCommitParts(NULL),
124 lastArbitratedTransactionNumberByArbitratorTable(NULL),
125 liveTransactionBySequenceNumberTable(NULL),
126 liveTransactionByTransactionIdTable(NULL),
127 liveCommitsTable(NULL),
128 liveCommitsByKeyTable(NULL),
129 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
130 rejectedSlotVector(NULL),
131 pendingTransactionQueue(NULL),
132 pendingSendArbitrationRounds(NULL),
133 pendingSendArbitrationEntriesToDelete(NULL),
134 transactionPartsSent(NULL),
135 outstandingTransactionStatus(NULL),
136 liveAbortsGeneratedByLocal(NULL),
137 offlineTransactionsCommittedAndAtServer(NULL),
138 localCommunicationTable(NULL),
139 lastTransactionSeenFromMachineFromServer(NULL),
140 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
141 lastInsertedNewKey(false),
148 * Init all the stuff needed for for table usage
151 // Init helper objects
152 random = new Random();
153 buffer = new SlotBuffer();
156 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
157 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
158 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
159 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
160 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
161 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
162 arbitratorTable = new Hashtable<IoTString *, int64_t>();
163 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
164 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
165 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
166 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
167 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
168 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
169 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
170 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
171 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
172 rejectedSlotVector = new Vector<int64_t>();
173 pendingTransactionQueue = new Vector<Transaction *>();
174 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
175 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
176 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
177 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
178 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
179 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
180 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
181 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
182 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
185 numberOfSlots = buffer->capacity();
186 setResizeThreshold();
190 * Initialize the table by inserting a table status as the first entry
191 * into the table status also initialize the crypto stuff.
193 void Table::initTable() {
194 cloud->initSecurity();
196 // Create the first insertion into the block chain which is the table status
197 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
198 localSequenceNumber++;
199 TableStatus *status = new TableStatus(s, numberOfSlots);
201 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
204 array = new Array<Slot *>(1);
206 // update local block chain
207 validateAndUpdate(array, true);
208 } else if (array->length() == 1) {
209 // in case we did push the slot BUT we failed to init it
210 validateAndUpdate(array, true);
212 throw new Error("Error on initialization");
217 * Rebuild the table from scratch by pulling the latest block chain
220 void Table::rebuild() {
221 // Just pull the latest slots from the server
222 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
223 validateAndUpdate(newslots, true);
225 updateLiveTransactionsAndStatus();
228 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
229 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
232 int64_t Table::getArbitrator(IoTString *key) {
233 return arbitratorTable->get(key);
236 void Table::close() {
240 IoTString *Table::getCommitted(IoTString *key) {
241 KeyValue *kv = committedKeyValueTable->get(key);
244 return kv->getValue();
250 IoTString *Table::getSpeculative(IoTString *key) {
251 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
254 kv = speculatedKeyValueTable->get(key);
258 kv = committedKeyValueTable->get(key);
262 return kv->getValue();
268 IoTString *Table::getCommittedAtomic(IoTString *key) {
269 KeyValue *kv = committedKeyValueTable->get(key);
271 if (!arbitratorTable->contains(key)) {
272 throw new Error("Key not Found.");
275 // Make sure new key value pair matches the current arbitrator
276 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
277 // TODO: Maybe not throw en error
278 throw new Error("Not all Key Values Match Arbitrator.");
282 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
283 return kv->getValue();
285 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
290 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
291 if (!arbitratorTable->contains(key)) {
292 throw new Error("Key not Found.");
295 // Make sure new key value pair matches the current arbitrator
296 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
297 // TODO: Maybe not throw en error
298 throw new Error("Not all Key Values Match Arbitrator.");
301 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
304 kv = speculatedKeyValueTable->get(key);
308 kv = committedKeyValueTable->get(key);
312 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
313 return kv->getValue();
315 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
320 bool Table::update() {
322 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
323 validateAndUpdate(newSlots, false);
325 updateLiveTransactionsAndStatus();
327 } catch (Exception *e) {
328 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
329 while (kit->hasNext()) {
330 int64_t m = kit->next();
339 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
341 if (!arbitratorTable->contains(keyName)) {
342 // There is already an arbitrator
345 NewKey *newKey = new NewKey(NULL, keyName, machineId);
347 if (sendToServer(newKey)) {
348 // If successfully inserted
354 void Table::startTransaction() {
355 // Create a new transaction, invalidates any old pending transactions.
356 pendingTransactionBuilder = new PendingTransaction(localMachineId);
359 void Table::addKV(IoTString *key, IoTString *value) {
361 // Make sure it is a valid key
362 if (!arbitratorTable->contains(key)) {
363 throw new Error("Key not Found.");
366 // Make sure new key value pair matches the current arbitrator
367 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
368 // TODO: Maybe not throw en error
369 throw new Error("Not all Key Values Match Arbitrator.");
372 // Add the key value to this transaction
373 KeyValue *kv = new KeyValue(key, value);
374 pendingTransactionBuilder->addKV(kv);
377 TransactionStatus *Table::commitTransaction() {
378 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
379 // transaction with no updates will have no effect on the system
380 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
383 // Set the local transaction sequence number and increment
384 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
385 localTransactionSequenceNumber++;
387 // Create the transaction status
388 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
390 // Create the new transaction
391 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
392 newTransaction->setTransactionStatus(transactionStatus);
394 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
395 // Add it to the queue and invalidate the builder for safety
396 pendingTransactionQueue->add(newTransaction);
398 arbitrateOnLocalTransaction(newTransaction);
399 updateLiveStateFromLocal();
402 pendingTransactionBuilder = new PendingTransaction(localMachineId);
406 } catch (ServerException *e) {
408 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
409 uint size = pendingTransactionQueue->size();
411 for (int iter = 0; iter < size; iter++) {
412 Transaction *transaction = pendingTransactionQueue->get(iter);
413 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
415 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
416 // Already contacted this client so ignore all attempts to contact this client
417 // to preserve ordering for arbitrator
421 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
423 if (sendReturn.getFirst()) {
424 // Failed to contact over local
425 arbitratorTriedAndFailed->add(transaction->getArbitrator());
427 // Successful contact or should not contact
429 if (sendReturn.getSecond()) {
435 pendingTransactionQueue->setSize(oldindex);
438 updateLiveStateFromLocal();
440 return transactionStatus;
444 * Recalculate the new resize threshold
446 void Table::setResizeThreshold() {
447 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
448 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
451 int64_t Table::getLocalSequenceNumber() {
452 return localSequenceNumber;
455 bool Table::sendToServer(NewKey *newKey) {
456 bool fromRetry = false;
458 if (hadPartialSendToServer) {
459 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
460 if (newSlots->length() == 0) {
462 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
464 if (sendSlotsReturn.getFirst()) {
465 if (newKey != NULL) {
466 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
471 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
472 while (trit->hasNext()) {
473 Transaction *transaction = trit->next();
474 transaction->resetServerFailure();
475 // Update which transactions parts still need to be sent
476 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
477 // Add the transaction status to the outstanding list
478 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
480 // Update the transaction status
481 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
483 // Check if all the transaction parts were successfully
484 // sent and if so then remove it from pending
485 if (transaction->didSendAllParts()) {
486 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
487 pendingTransactionQueue->remove(transaction);
492 newSlots = sendSlotsReturn.getThird();
493 bool isInserted = false;
494 for (uint si = 0; si < newSlots->length(); si++) {
495 Slot *s = newSlots->get(si);
496 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
502 for (uint si = 0; si < newSlots->length(); si++) {
503 Slot *s = newSlots->get(si);
508 // Process each entry in the slot
509 Vector<Entry *> *ventries = s->getEntries();
510 uint vesize = ventries->size();
511 for (uint vei = 0; vei < vesize; vei++) {
512 Entry *entry = ventries->get(vei);
513 if (entry->getType() == TypeLastMessage) {
514 LastMessage *lastMessage = (LastMessage *)entry;
515 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
524 if (newKey != NULL) {
525 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
530 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
531 while (trit->hasNext()) {
532 Transaction *transaction = trit->next();
533 transaction->resetServerFailure();
535 // Update which transactions parts still need to be sent
536 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
538 // Add the transaction status to the outstanding list
539 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
541 // Update the transaction status
542 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
544 // Check if all the transaction parts were successfully sent and if so then remove it from pending
545 if (transaction->didSendAllParts()) {
546 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
547 pendingTransactionQueue->remove(transaction);
549 transaction->resetServerFailure();
550 // Set the transaction sequence number back to nothing
551 if (!transaction->didSendAPartToServer()) {
552 transaction->setSequenceNumber(-1);
560 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
561 while (trit->hasNext()) {
562 Transaction *transaction = trit->next();
563 transaction->resetServerFailure();
564 // Set the transaction sequence number back to nothing
565 if (!transaction->didSendAPartToServer()) {
566 transaction->setSequenceNumber(-1);
571 if (sendSlotsReturn.getThird()->length() != 0) {
572 // insert into the local block chain
573 validateAndUpdate(sendSlotsReturn.getThird(), true);
577 bool isInserted = false;
578 for (uint si = 0; si < newSlots->length(); si++) {
579 Slot *s = newSlots->get(si);
580 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
586 for (uint si = 0; si < newSlots->length(); si++) {
587 Slot *s = newSlots->get(si);
592 // Process each entry in the slot
593 Vector<Entry *> *entries = s->getEntries();
594 uint eSize = entries->size();
595 for(uint ei=0; ei < eSize; ei++) {
596 Entry * entry = entries->get(ei);
598 if (entry->getType() == TypeLastMessage) {
599 LastMessage *lastMessage = (LastMessage *)entry;
600 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
609 if (newKey != NULL) {
610 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
615 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
616 while (trit->hasNext()) {
617 Transaction *transaction = trit->next();
618 transaction->resetServerFailure();
620 // Update which transactions parts still need to be sent
621 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
623 // Add the transaction status to the outstanding list
624 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
626 // Update the transaction status
627 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
629 // Check if all the transaction parts were successfully sent and if so then remove it from pending
630 if (transaction->didSendAllParts()) {
631 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
632 pendingTransactionQueue->remove(transaction);
634 transaction->resetServerFailure();
635 // Set the transaction sequence number back to nothing
636 if (!transaction->didSendAPartToServer()) {
637 transaction->setSequenceNumber(-1);
643 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
644 while (trit->hasNext()) {
645 Transaction *transaction = trit->next();
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 // insert into the local block chain
656 validateAndUpdate(newSlots, true);
659 } catch (ServerException *e) {
666 // While we have stuff that needs inserting into the block chain
667 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
671 if (hadPartialSendToServer) {
672 throw new Error("Should Be error free");
677 // If there is a new key with same name then end
678 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
683 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
684 localSequenceNumber++;
686 // Try to fill the slot with data
687 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
688 bool needsResize = fillSlotsReturn.getFirst();
689 int newSize = fillSlotsReturn.getSecond();
690 bool insertedNewKey = fillSlotsReturn.getThird();
693 // Reset which transaction to send
694 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
695 while (trit->hasNext()) {
696 Transaction *transaction = trit->next();
697 transaction->resetNextPartToSend();
699 // Set the transaction sequence number back to nothing
700 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
701 transaction->setSequenceNumber(-1);
706 // Clear the sent data since we are trying again
707 pendingSendArbitrationEntriesToDelete->clear();
708 transactionPartsSent->clear();
710 // We needed a resize so try again
711 fillSlot(slot, true, newKey);
714 lastSlotAttemptedToSend = slot;
715 lastIsNewKey = (newKey != NULL);
716 lastInsertedNewKey = insertedNewKey;
717 lastNewSize = newSize;
719 lastTransactionPartsSent = transactionPartsSent->clone();
720 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
722 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
724 if (sendSlotsReturn.getFirst()) {
726 // Did insert into the block chain
728 if (insertedNewKey) {
729 // This slot was what was inserted not a previous slot
731 // New Key was successfully inserted into the block chain so dont want to insert it again
735 // Remove the aborts and commit parts that were sent from the pending to send queue
736 uint size = pendingSendArbitrationRounds->size();
738 for (uint i = 0; i < size; i++) {
739 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
740 round->removeParts(pendingSendArbitrationEntriesToDelete);
742 if (!round->isDoneSending()) {
743 // Sent all the parts
744 pendingSendArbitrationRounds->set(oldcount++,
745 pendingSendArbitrationRounds->get(i));
748 pendingSendArbitrationRounds->setSize(oldcount);
750 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
751 while (trit->hasNext()) {
752 Transaction *transaction = trit->next();
753 transaction->resetServerFailure();
755 // Update which transactions parts still need to be sent
756 transaction->removeSentParts(transactionPartsSent->get(transaction));
758 // Add the transaction status to the outstanding list
759 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
761 // Update the transaction status
762 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
764 // Check if all the transaction parts were successfully sent and if so then remove it from pending
765 if (transaction->didSendAllParts()) {
766 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
767 pendingTransactionQueue->remove(transaction);
772 // Reset which transaction to send
773 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
774 while (trit->hasNext()) {
775 Transaction *transaction = trit->next();
776 transaction->resetNextPartToSend();
778 // Set the transaction sequence number back to nothing
779 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
780 transaction->setSequenceNumber(-1);
786 // Clear the sent data in preparation for next send
787 pendingSendArbitrationEntriesToDelete->clear();
788 transactionPartsSent->clear();
790 if (sendSlotsReturn.getThird()->length() != 0) {
791 // insert into the local block chain
792 validateAndUpdate(sendSlotsReturn.getThird(), true);
796 } catch (ServerException *e) {
797 if (e->getType() != ServerException_TypeInputTimeout) {
798 // Nothing was able to be sent to the server so just clear these data structures
799 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
800 while (trit->hasNext()) {
801 Transaction *transaction = trit->next();
802 transaction->resetNextPartToSend();
804 // Set the transaction sequence number back to nothing
805 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
806 transaction->setSequenceNumber(-1);
811 // There was a partial send to the server
812 hadPartialSendToServer = true;
814 // Nothing was able to be sent to the server so just clear these data structures
815 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
816 while (trit->hasNext()) {
817 Transaction *transaction = trit->next();
818 transaction->resetNextPartToSend();
819 transaction->setServerFailure();
824 pendingSendArbitrationEntriesToDelete->clear();
825 transactionPartsSent->clear();
830 return newKey == NULL;
833 bool Table::updateFromLocal(int64_t machineId) {
834 if (!localCommunicationTable->contains(machineId))
837 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
839 // Get the size of the send data
840 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
842 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
843 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
844 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
847 Array<char> *sendData = new Array<char>(sendDataSize);
848 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
851 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
855 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
856 localSequenceNumber++;
858 if (returnData == NULL) {
859 // Could not contact server
864 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
865 int numberOfEntries = bbDecode->getInt();
867 for (int i = 0; i < numberOfEntries; i++) {
868 char type = bbDecode->get();
869 if (type == TypeAbort) {
870 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
872 } else if (type == TypeCommitPart) {
873 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
874 processEntry(commitPart);
878 updateLiveStateFromLocal();
883 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
885 // Get the devices local communications
886 if (!localCommunicationTable->contains(transaction->getArbitrator()))
887 return Pair<bool, bool>(true, false);
889 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
891 // Get the size of the send data
892 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
894 Vector<TransactionPart *> * tParts = transaction->getParts();
895 uint tPartsSize = tParts->size();
896 for (uint i = 0; i < tPartsSize; i++) {
897 TransactionPart * part = tParts->get(i);
898 sendDataSize += part->getSize();
902 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
903 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
904 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
907 // Make the send data size
908 Array<char> *sendData = new Array<char>(sendDataSize);
909 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
912 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
913 bbEncode->putInt(transaction->getParts()->size());
915 Vector<TransactionPart *> * tParts = transaction->getParts();
916 uint tPartsSize = tParts->size();
917 for (uint i = 0; i < tPartsSize; i++) {
918 TransactionPart * part = tParts->get(i);
919 part->encode(bbEncode);
924 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
925 localSequenceNumber++;
927 if (returnData == NULL) {
928 // Could not contact server
929 return Pair<bool, bool>(true, false);
933 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
934 bool didCommit = bbDecode->get() == 1;
935 bool couldArbitrate = bbDecode->get() == 1;
936 int numberOfEntries = bbDecode->getInt();
937 bool foundAbort = false;
939 for (int i = 0; i < numberOfEntries; i++) {
940 char type = bbDecode->get();
941 if (type == TypeAbort) {
942 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
944 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
949 } else if (type == TypeCommitPart) {
950 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
951 processEntry(commitPart);
955 updateLiveStateFromLocal();
957 if (couldArbitrate) {
958 TransactionStatus * status = transaction->getTransactionStatus();
960 status->setStatus(TransactionStatus_StatusCommitted);
962 status->setStatus(TransactionStatus_StatusAborted);
965 TransactionStatus * status = transaction->getTransactionStatus();
967 status->setStatus(TransactionStatus_StatusAborted);
969 status->setStatus(TransactionStatus_StatusCommitted);
973 return Pair<bool, bool>(false, true);
976 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
979 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
980 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
981 int numberOfParts = bbDecode->getInt();
983 // If we did commit a transaction or not
984 bool didCommit = false;
985 bool couldArbitrate = false;
987 if (numberOfParts != 0) {
989 // decode the transaction
990 Transaction *transaction = new Transaction();
991 for (int i = 0; i < numberOfParts; i++) {
993 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
994 transaction->addPartDecode(newPart);
997 // Arbitrate on transaction and pull relevant return data
998 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
999 couldArbitrate = localArbitrateReturn.getFirst();
1000 didCommit = localArbitrateReturn.getSecond();
1002 updateLiveStateFromLocal();
1004 // Transaction was sent to the server so keep track of it to prevent double commit
1005 if (transaction->getSequenceNumber() != -1) {
1006 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1010 // The data to send back
1011 int returnDataSize = 0;
1012 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1014 // Get the aborts to send back
1015 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
1016 Collections->sort(abortLocalSequenceNumbers);
1017 uint asize = abortLocalSequenceNumbers->size();
1018 for(uint i=0; i<asize; i++) {
1019 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1020 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1024 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1025 unseenArbitrations->add(abort);
1026 returnDataSize += abort->getSize();
1029 // Get the commits to send back
1030 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1031 if (commitForClientTable != NULL) {
1032 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1033 Collections->sort(commitLocalSequenceNumbers);
1035 uint clsSize = commitLocalSequenceNumbers->size();
1036 for(uint clsi = 0; clsi < clsSize; clsi++) {
1037 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1038 Commit *commit = commitForClientTable->get(localSequenceNumber);
1040 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1044 unseenArbitrations->addAll(commit->getParts()->values());
1046 for (CommitPart *commitPart : commit->getParts()->values()) {
1047 returnDataSize += commitPart->getSize();
1052 // Number of arbitration entries to decode
1053 returnDataSize += 2 * sizeof(int32_t);
1055 // bool of did commit or not
1056 if (numberOfParts != 0) {
1057 returnDataSize += sizeof(char);
1060 // Data to send Back
1061 Array<char> *returnData = new Array<char>(returnDataSize);
1062 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1064 if (numberOfParts != 0) {
1066 bbEncode->put((char)1);
1068 bbEncode->put((char)0);
1070 if (couldArbitrate) {
1071 bbEncode->put((char)1);
1073 bbEncode->put((char)0);
1077 bbEncode->putInt(unseenArbitrations->size());
1078 uint size = unseenArbitrations->size();
1079 for (uint i = 0; i < size; i++) {
1080 Entry *entry = unseenArbitrations->get(i);
1081 entry->encode(bbEncode);
1084 localSequenceNumber++;
1088 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1089 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1090 attemptedToSendToServer = true;
1092 bool inserted = false;
1093 bool lastTryInserted = false;
1095 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1096 if (array == NULL) {
1097 array = new Array<Slot *>();
1098 array->set(0, slot);
1099 rejectedSlotVector->clear();
1102 if (array->length() == 0) {
1103 throw new Error("Server Error: Did not send any slots");
1106 // if (attemptedToSendToServerTmp) {
1107 if (hadPartialSendToServer) {
1109 bool isInserted = false;
1110 uint size = array->length();
1111 for (uint i = 0; i < size; i++) {
1112 Slot *s = array->get(i);
1113 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1119 for (uint i = 0; i < size; i++) {
1120 Slot *s = array->get(i);
1125 // Process each entry in the slot
1126 Vector<Entry *> *entries = s->getEntries();
1127 uint eSize = entries->size();
1128 for(uint ei=0; ei < eSize; ei++) {
1129 Entry * entry = entries->get(ei);
1131 if (entry->getType() == TypeLastMessage) {
1132 LastMessage *lastMessage = (LastMessage *)entry;
1134 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1143 rejectedSlotVector->add(slot->getSequenceNumber());
1144 lastTryInserted = false;
1146 lastTryInserted = true;
1149 rejectedSlotVector->add(slot->getSequenceNumber());
1150 lastTryInserted = false;
1154 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1158 * Returns false if a resize was needed
1160 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1162 if (liveSlotCount > bufferResizeThreshold) {
1163 resize = true;//Resize is forced
1167 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1168 TableStatus *status = new TableStatus(slot, newSize);
1169 slot->addEntry(status);
1172 // Fill with rejected slots first before doing anything else
1173 doRejectedMessages(slot);
1175 // Do mandatory rescue of entries
1176 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1178 // Extract working variables
1179 bool needsResize = mandatoryRescueReturn.getFirst();
1180 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1181 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1183 if (needsResize && !resize) {
1184 // We need to resize but we are not resizing so return false
1185 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1188 bool inserted = false;
1189 if (newKeyEntry != NULL) {
1190 newKeyEntry->setSlot(slot);
1191 if (slot->hasSpace(newKeyEntry)) {
1192 slot->addEntry(newKeyEntry);
1197 // Clear the transactions, aborts and commits that were sent previously
1198 transactionPartsSent->clear();
1199 pendingSendArbitrationEntriesToDelete->clear();
1200 uint size = pendingSendArbitrationRounds->size();
1201 for (uint i = 0; i < size; i++) {
1202 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1203 bool isFull = false;
1204 round->generateParts();
1205 Vector<Entry *> *parts = round->getParts();
1207 // Insert pending arbitration data
1208 uint vsize = parts->size();
1209 for (uint vi = 0; vi < vsize; vi++) {
1210 Entry *arbitrationData = parts->get(vi);
1212 // If it is an abort then we need to set some information
1213 if (arbitrationData->getType() == TypeAbort) {
1214 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1217 if (!slot->hasSpace(arbitrationData)) {
1218 // No space so cant do anything else with these data entries
1223 // Add to this current slot and add it to entries to delete
1224 slot->addEntry(arbitrationData);
1225 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1233 if (pendingTransactionQueue->size() > 0) {
1234 Transaction *transaction = pendingTransactionQueue->get(0);
1235 // Set the transaction sequence number if it has yet to be inserted into the block chain
1236 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1237 transaction->setSequenceNumber(slot->getSequenceNumber());
1241 TransactionPart *part = transaction->getNextPartToSend();
1243 // Ran out of parts to send for this transaction so move on
1247 if (slot->hasSpace(part)) {
1248 slot->addEntry(part);
1249 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1250 if (partsSent == NULL) {
1251 partsSent = new Vector<int32_t>();
1252 transactionPartsSent->put(transaction, partsSent);
1254 partsSent->add(part->getPartNumber());
1255 transactionPartsSent->put(transaction, partsSent);
1262 // Fill the remainder of the slot with rescue data
1263 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1265 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1268 void Table::doRejectedMessages(Slot *s) {
1269 if (!rejectedSlotVector->isEmpty()) {
1270 /* TODO: We should avoid generating a rejected message entry if
1271 * there is already a sufficient entry in the queue (e->g->,
1272 * equalsto value of true and same sequence number)-> */
1274 int64_t old_seqn = rejectedSlotVector->get(0);
1275 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1276 int64_t new_seqn = rejectedSlotVector->lastElement();
1277 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1280 int64_t prev_seqn = -1;
1282 /* Go through list of missing messages */
1283 for (; i < rejectedSlotVector->size(); i++) {
1284 int64_t curr_seqn = rejectedSlotVector->get(i);
1285 Slot *s_msg = buffer->getSlot(curr_seqn);
1288 prev_seqn = curr_seqn;
1290 /* Generate rejected message entry for missing messages */
1291 if (prev_seqn != -1) {
1292 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1295 /* Generate rejected message entries for present messages */
1296 for (; i < rejectedSlotVector->size(); i++) {
1297 int64_t curr_seqn = rejectedSlotVector->get(i);
1298 Slot *s_msg = buffer->getSlot(curr_seqn);
1299 int64_t machineid = s_msg->getMachineID();
1300 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1307 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1308 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1309 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1310 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1311 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1314 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1315 bool seenLiveSlot = false;
1316 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1317 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1321 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1322 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1323 // Push slot number forward
1324 if (!seenLiveSlot) {
1325 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1328 if (!previousSlot->isLive()) {
1332 // We have seen a live slot
1333 seenLiveSlot = true;
1335 // Get all the live entries for a slot
1336 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1338 // Iterate over all the live entries and try to rescue them
1339 for (Entry *liveEntry : liveEntries) {
1340 if (slot->hasSpace(liveEntry)) {
1341 // Enough space to rescue the entry
1342 slot->addEntry(liveEntry);
1343 } else if (currentSequenceNumber == firstIfFull) {
1344 //if there's no space but the entry is about to fall off the queue
1345 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1351 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1354 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1355 /* now go through live entries from least to greatest sequence number until
1356 * either all live slots added, or the slot doesn't have enough room
1357 * for SKIP_THRESHOLD consecutive entries*/
1359 int64_t newestseqnum = buffer->getNewestSeqNum();
1361 for (; seqn <= newestseqnum; seqn++) {
1362 Slot *prevslot = buffer->getSlot(seqn);
1363 //Push slot number forward
1365 oldestLiveSlotSequenceNumver = seqn;
1367 if (!prevslot->isLive())
1369 seenliveslot = true;
1370 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1371 for (Entry *liveentry : liveentries) {
1372 if (s->hasSpace(liveentry))
1373 s->addEntry(liveentry);
1376 if (skipcount > Table_SKIP_THRESHOLD)
1386 * Checks for malicious activity and updates the local copy of the block chain->
1388 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1389 // The cloud communication layer has checked slot HMACs already
1391 if (newSlots->length() == 0) {
1395 // Make sure all slots are newer than the last largest slot this
1397 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1398 if (firstSeqNum <= sequenceNumber) {
1399 throw new Error("Server Error: Sent older slots!");
1402 // Create an object that can access both new slots and slots in our
1403 // local chain without committing slots to our local chain
1404 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1406 // Check that the HMAC chain is not broken
1407 checkHMACChain(indexer, newSlots);
1409 // Set to keep track of messages from clients
1410 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1412 // Process each slots data
1413 for (Slot *slot : newSlots) {
1414 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1416 updateExpectedSize();
1419 // If there is a gap, check to see if the server sent us
1421 if (firstSeqNum != (sequenceNumber + 1)) {
1423 // Check the size of the slots that were sent down by the server->
1424 // Can only check the size if there was a gap
1425 checkNumSlots(newSlots->length);
1427 // Since there was a gap every machine must have pushed a slot or
1428 // must have a last message message-> If not then the server is
1430 if (!machineSet->isEmpty()) {
1431 throw new Error("Missing record for machines: ");
1435 // Update the size of our local block chain->
1438 // Commit new to slots to the local block chain->
1439 for (Slot *slot : newSlots) {
1441 // Insert this slot into our local block chain copy->
1442 buffer->putSlot(slot);
1444 // Keep track of how many slots are currently live (have live data
1449 // Get the sequence number of the latest slot in the system
1450 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1451 updateLiveStateFromServer();
1453 // No Need to remember after we pulled from the server
1454 offlineTransactionsCommittedAndAtServer->clear();
1456 // This is invalidated now
1457 hadPartialSendToServer = false;
1460 void Table::updateLiveStateFromServer() {
1461 // Process the new transaction parts
1462 processNewTransactionParts();
1464 // Do arbitration on new transactions that were received
1465 arbitrateFromServer();
1467 // Update all the committed keys
1468 bool didCommitOrSpeculate = updateCommittedTable();
1470 // Delete the transactions that are now dead
1471 updateLiveTransactionsAndStatus();
1474 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1475 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1478 void Table::updateLiveStateFromLocal() {
1479 // Update all the committed keys
1480 bool didCommitOrSpeculate = updateCommittedTable();
1482 // Delete the transactions that are now dead
1483 updateLiveTransactionsAndStatus();
1486 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1487 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1490 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1491 int64_t prevslots = firstSequenceNumber;
1493 if (didFindTableStatus) {
1495 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1498 didFindTableStatus = true;
1499 currMaxSize = numberOfSlots;
1502 void Table::updateExpectedSize() {
1505 if (expectedsize > currMaxSize) {
1506 expectedsize = currMaxSize;
1512 * Check the size of the block chain to make sure there are enough
1513 * slots sent back by the server-> This is only called when we have a
1514 * gap between the slots that we have locally and the slots sent by
1515 * the server therefore in the slots sent by the server there will be
1516 * at least 1 Table status message
1518 void Table::checkNumSlots(int numberOfSlots) {
1519 if (numberOfSlots != expectedsize) {
1520 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1524 void Table::updateCurrMaxSize(int newmaxsize) {
1525 currMaxSize = newmaxsize;
1530 * Update the size of of the local buffer if it is needed->
1532 void Table::commitNewMaxSize() {
1533 didFindTableStatus = false;
1535 // Resize the local slot buffer
1536 if (numberOfSlots != currMaxSize) {
1537 buffer->resize((int32_t)currMaxSize);
1540 // Change the number of local slots to the new size
1541 numberOfSlots = (int32_t)currMaxSize;
1543 // Recalculate the resize threshold since the size of the local
1544 // buffer has changed
1545 setResizeThreshold();
1549 * Process the new transaction parts from this latest round of slots
1550 * received from the server
1552 void Table::processNewTransactionParts() {
1554 if (newTransactionParts->size() == 0) {
1555 // Nothing new to process
1559 // Iterate through all the machine Ids that we received new parts
1561 for (int64_t machineId : newTransactionParts->keySet()) {
1562 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1564 // Iterate through all the parts for that machine Id
1565 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1566 TransactionPart *part = parts->get(partId);
1568 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1569 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1570 // Set dead the transaction part
1575 // Get the transaction object for that sequence number
1576 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1578 if (transaction == NULL) {
1579 // This is a new transaction that we dont have so make a new one
1580 transaction = new Transaction();
1582 // Insert this new transaction into the live tables
1583 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1584 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1587 // Add that part to the transaction
1588 transaction->addPartDecode(part);
1592 // Clear all the new transaction parts in preparation for the next
1593 // time the server sends slots
1594 newTransactionParts->clear();
1597 void Table::arbitrateFromServer() {
1599 if (liveTransactionBySequenceNumberTable->size() == 0) {
1600 // Nothing to arbitrate on so move on
1604 // Get the transaction sequence numbers and sort from oldest to newest
1605 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1606 Collections->sort(transactionSequenceNumbers);
1608 // Collection of key value pairs that are
1609 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1611 // The last transaction arbitrated on
1612 int64_t lastTransactionCommitted = -1;
1613 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1615 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1616 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1620 // Check if this machine arbitrates for this transaction if not
1621 // then we cant arbitrate this transaction
1622 if (transaction->getArbitrator() != localMachineId) {
1626 if (transactionSequenceNumber < lastSeqNumArbOn) {
1630 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1631 // We have seen this already locally so dont commit again
1636 if (!transaction->isComplete()) {
1637 // Will arbitrate in incorrect order if we continue so just break
1643 // update the largest transaction seen by arbitrator from server
1644 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1645 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1647 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1648 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1649 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1653 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1654 // Guard evaluated as true
1656 // Update the local changes so we can make the commit
1657 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1658 while (kvit->hasNext()) {
1659 KeyValue *kv = kvit->next();
1660 speculativeTableTmp->put(kv->getKey(), kv);
1664 // Update what the last transaction committed was for use in batch commit
1665 lastTransactionCommitted = transactionSequenceNumber;
1667 // Guard evaluated was false so create abort
1669 Abort *newAbort = new Abort(NULL,
1670 transaction->getClientLocalSequenceNumber(),
1671 transaction->getSequenceNumber(),
1672 transaction->getMachineId(),
1673 transaction->getArbitrator(),
1674 localArbitrationSequenceNumber);
1675 localArbitrationSequenceNumber++;
1676 generatedAborts->add(newAbort);
1678 // Insert the abort so we can process
1679 processEntry(newAbort);
1682 lastSeqNumArbOn = transactionSequenceNumber;
1685 Commit *newCommit = NULL;
1687 // If there is something to commit
1688 if (speculativeTableTmp->size() != 0) {
1689 // Create the commit and increment the commit sequence number
1690 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1691 localArbitrationSequenceNumber++;
1693 // Add all the new keys to the commit
1694 for (KeyValue *kv : speculativeTableTmp->values()) {
1695 newCommit->addKV(kv);
1698 // create the commit parts
1699 newCommit->createCommitParts();
1701 // Append all the commit parts to the end of the pending queue
1702 // waiting for sending to the server
1703 // Insert the commit so we can process it
1704 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1705 processEntry(commitPart);
1709 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1710 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1711 pendingSendArbitrationRounds->add(arbitrationRound);
1713 if (compactArbitrationData()) {
1714 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1715 if (newArbitrationRound->getCommit() != NULL) {
1716 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1717 processEntry(commitPart);
1724 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1726 // Check if this machine arbitrates for this transaction if not then
1727 // we cant arbitrate this transaction
1728 if (transaction->getArbitrator() != localMachineId) {
1729 return Pair<bool, bool>(false, false);
1732 if (!transaction->isComplete()) {
1733 // Will arbitrate in incorrect order if we continue so just break
1735 return Pair<bool, bool>(false, false);
1738 if (transaction->getMachineId() != localMachineId) {
1739 // dont do this check for local transactions
1740 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1741 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1742 // We've have already seen this from the server
1743 return Pair<bool, bool>(false, false);
1748 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1749 // Guard evaluated as true Create the commit and increment the
1750 // commit sequence number
1751 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1752 localArbitrationSequenceNumber++;
1754 // Update the local changes so we can make the commit
1755 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1756 while (kvit->hasNext()) {
1757 KeyValue *kv = kvit->next();
1758 newCommit->addKV(kv);
1762 // create the commit parts
1763 newCommit->createCommitParts();
1765 // Append all the commit parts to the end of the pending queue
1766 // waiting for sending to the server
1767 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1768 pendingSendArbitrationRounds->add(arbitrationRound);
1770 if (compactArbitrationData()) {
1771 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1772 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1773 processEntry(commitPart);
1776 // Insert the commit so we can process it
1777 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1778 processEntry(commitPart);
1782 if (transaction->getMachineId() == localMachineId) {
1783 TransactionStatus *status = transaction->getTransactionStatus();
1784 if (status != NULL) {
1785 status->setStatus(TransactionStatus_StatusCommitted);
1789 updateLiveStateFromLocal();
1790 return Pair<bool, bool>(true, true);
1792 if (transaction->getMachineId() == localMachineId) {
1793 // For locally created messages update the status
1794 // Guard evaluated was false so create abort
1795 TransactionStatus * status = transaction->getTransactionStatus();
1796 if (status != NULL) {
1797 status->setStatus(TransactionStatus_StatusAborted);
1800 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1803 Abort *newAbort = new Abort(NULL,
1804 transaction->getClientLocalSequenceNumber(),
1806 transaction->getMachineId(),
1807 transaction->getArbitrator(),
1808 localArbitrationSequenceNumber);
1809 localArbitrationSequenceNumber++;
1810 addAbortSet->add(newAbort);
1812 // Append all the commit parts to the end of the pending queue
1813 // waiting for sending to the server
1814 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1815 pendingSendArbitrationRounds->add(arbitrationRound);
1817 if (compactArbitrationData()) {
1818 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1819 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1820 processEntry(commitPart);
1825 updateLiveStateFromLocal();
1826 return Pair<bool, bool>(true, false);
1831 * Compacts the arbitration data my merging commits and aggregating
1832 * aborts so that a single large push of commits can be done instead
1833 * of many small updates
1835 bool Table::compactArbitrationData() {
1836 if (pendingSendArbitrationRounds->size() < 2) {
1837 // Nothing to compact so do nothing
1841 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1842 if (lastRound->getDidSendPart()) {
1846 bool hadCommit = (lastRound->getCommit() == NULL);
1847 bool gotNewCommit = false;
1849 int numberToDelete = 1;
1850 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1851 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1853 if (round->isFull() || round->getDidSendPart()) {
1854 // Stop since there is a part that cannot be compacted and we
1855 // need to compact in order
1859 if (round->getCommit() == NULL) {
1860 // Try compacting aborts only
1861 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1862 if (newSize > ArbitrationRound->MAX_PARTS) {
1863 // Cant compact since it would be too large
1866 lastRound->addAborts(round->getAborts());
1868 // Create a new larger commit
1869 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1870 localArbitrationSequenceNumber++;
1872 // Create the commit parts so that we can count them
1873 newCommit->createCommitParts();
1875 // Calculate the new size of the parts
1876 int newSize = newCommit->getNumberOfParts();
1877 newSize += lastRound->getAbortsCount();
1878 newSize += round->getAbortsCount();
1880 if (newSize > ArbitrationRound->MAX_PARTS) {
1881 // Cant compact since it would be too large
1885 // Set the new compacted part
1886 lastRound->setCommit(newCommit);
1887 lastRound->addAborts(round->getAborts());
1888 gotNewCommit = true;
1894 if (numberToDelete != 1) {
1895 // If there is a compaction
1896 // Delete the previous pieces that are now in the new compacted piece
1897 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1898 pendingSendArbitrationRounds->clear();
1900 for (int i = 0; i < numberToDelete; i++) {
1901 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1905 // Add the new compacted into the pending to send list
1906 pendingSendArbitrationRounds->add(lastRound);
1908 // Should reinsert into the commit processor
1909 if (hadCommit && gotNewCommit) {
1918 * Update all the commits and the committed tables, sets dead the dead
1921 bool Table::updateCommittedTable() {
1923 if (newCommitParts->size() == 0) {
1924 // Nothing new to process
1928 // Iterate through all the machine Ids that we received new parts for
1929 for (int64_t machineId : newCommitParts->keySet()) {
1930 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1932 // Iterate through all the parts for that machine Id
1933 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1934 CommitPart *part = parts->get(partId);
1936 // Get the transaction object for that sequence number
1937 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1939 if (commitForClientTable == NULL) {
1940 // This is the first commit from this device
1941 commitForClientTable = new Hashtable<int64_t, Commit *>();
1942 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1945 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1947 if (commit == NULL) {
1948 // This is a new commit that we dont have so make a new one
1949 commit = new Commit();
1951 // Insert this new commit into the live tables
1952 commitForClientTable->put(part->getSequenceNumber(), commit);
1955 // Add that part to the commit
1956 commit->addPartDecode(part);
1960 // Clear all the new commits parts in preparation for the next time
1961 // the server sends slots
1962 newCommitParts->clear();
1964 // If we process a new commit keep track of it for future use
1965 bool didProcessANewCommit = false;
1967 // Process the commits one by one
1968 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1970 // Get all the commits for a specific arbitrator
1971 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1973 // Sort the commits in order
1974 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1975 Collections->sort(commitSequenceNumbers);
1977 // Get the last commit seen from this arbitrator
1978 int64_t lastCommitSeenSequenceNumber = -1;
1979 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1980 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1983 // Go through each new commit one by one
1984 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1985 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1986 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1988 // Special processing if a commit is not complete
1989 if (!commit->isComplete()) {
1990 if (i == (commitSequenceNumbers->size() - 1)) {
1991 // If there is an incomplete commit and this commit is the
1992 // latest one seen then this commit cannot be processed and
1993 // there are no other commits
1996 // This is a commit that was already dead but parts of it
1997 // are still in the block chain (not flushed out yet)->
1998 // Delete it and move on
2000 commitForClientTable->remove(commit->getSequenceNumber());
2005 // Update the last transaction that was updated if we can
2006 if (commit->getTransactionSequenceNumber() != -1) {
2007 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2009 // Update the last transaction sequence number that the arbitrator arbitrated on
2010 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2011 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2015 // Update the last arbitration data that we have seen so far
2016 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2017 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2018 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2020 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2023 // Never seen any data from this arbitrator so record the first one
2024 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2027 // We have already seen this commit before so need to do the
2028 // full processing on this commit
2029 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2031 // Update the last transaction that was updated if we can
2032 if (commit->getTransactionSequenceNumber() != -1) {
2033 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2035 // Update the last transaction sequence number that the arbitrator arbitrated on
2036 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2037 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2044 // If we got here then this is a brand new commit and needs full
2046 // Get what commits should be edited, these are the commits that
2047 // have live values for their keys
2048 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2050 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2051 while (kvit->hasNext()) {
2052 KeyValue *kv = kvit->next();
2053 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2057 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2059 // Update each previous commit that needs to be updated
2060 for (Commit *previousCommit : commitsToEdit) {
2062 // Only bother with live commits (TODO: Maybe remove this check)
2063 if (previousCommit->isLive()) {
2065 // Update which keys in the old commits are still live
2067 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2068 while (kvit->hasNext()) {
2069 KeyValue *kv = kvit->next();
2070 previousCommit->invalidateKey(kv->getKey());
2075 // if the commit is now dead then remove it
2076 if (!previousCommit->isLive()) {
2077 commitForClientTable->remove(previousCommit);
2082 // Update the last seen sequence number from this arbitrator
2083 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2084 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2085 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2088 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2091 // We processed a new commit that we havent seen before
2092 didProcessANewCommit = true;
2094 // Update the committed table of keys and which commit is using which key
2096 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2097 while (kvit->hasNext()) {
2098 KeyValue *kv = kvit->next();
2099 committedKeyValueTable->put(kv->getKey(), kv);
2100 liveCommitsByKeyTable->put(kv->getKey(), commit);
2107 return didProcessANewCommit;
2111 * Create the speculative table from transactions that are still live
2112 * and have come from the cloud
2114 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2115 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2116 // There is nothing to speculate on
2120 // Create a list of the transaction sequence numbers and sort them
2121 // from oldest to newest
2122 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2123 Collections->sort(transactionSequenceNumbersSorted);
2125 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2128 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2129 // If there is a gap in the transaction sequence numbers then
2130 // there was a commit or an abort of a transaction OR there was a
2131 // new commit (Could be from offline commit) so a redo the
2132 // speculation from scratch
2134 // Start from scratch
2135 speculatedKeyValueTable->clear();
2136 lastTransactionSequenceNumberSpeculatedOn = -1;
2137 oldestTransactionSequenceNumberSpeculatedOn = -1;
2141 // Remember the front of the transaction list
2142 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2144 // Find where to start arbitration from
2145 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2147 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2148 // Make sure we are not out of bounds
2149 return false; // did not speculate
2152 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2153 bool didSkip = true;
2155 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2156 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2157 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2159 if (!transaction->isComplete()) {
2160 // If there is an incomplete transaction then there is nothing
2161 // we can do add this transactions arbitrator to the list of
2162 // arbitrators we should ignore
2163 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2168 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2172 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2174 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2175 // Guard evaluated to true so update the speculative table
2177 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2178 while (kvit->hasNext()) {
2179 KeyValue *kv = kvit->next();
2180 speculatedKeyValueTable->put(kv->getKey(), kv);
2188 // Since there was a skip we need to redo the speculation next time around
2189 lastTransactionSequenceNumberSpeculatedOn = -1;
2190 oldestTransactionSequenceNumberSpeculatedOn = -1;
2193 // We did some speculation
2198 * Create the pending transaction speculative table from transactions
2199 * that are still in the pending transaction buffer
2201 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2202 if (pendingTransactionQueue->size() == 0) {
2203 // There is nothing to speculate on
2207 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2208 // need to reset on the pending speculation
2209 lastPendingTransactionSpeculatedOn = NULL;
2210 firstPendingTransaction = pendingTransactionQueue->get(0);
2211 pendingTransactionSpeculatedKeyValueTable->clear();
2214 // Find where to start arbitration from
2215 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2217 if (startIndex >= pendingTransactionQueue->size()) {
2218 // Make sure we are not out of bounds
2222 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2223 Transaction *transaction = pendingTransactionQueue->get(i);
2225 lastPendingTransactionSpeculatedOn = transaction;
2227 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2228 // Guard evaluated to true so update the speculative table
2229 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2230 while (kvit->hasNext()) {
2231 KeyValue *kv = kvit->next();
2232 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2240 * Set dead and remove from the live transaction tables the
2241 * transactions that are dead
2243 void Table::updateLiveTransactionsAndStatus() {
2245 // Go through each of the transactions
2246 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2247 Transaction *transaction = iter->next()->getValue();
2249 // Check if the transaction is dead
2250 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2251 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2253 // Set dead the transaction
2254 transaction->setDead();
2256 // Remove the transaction from the live table
2258 liveTransactionByTransactionIdTable->remove(transaction->getId());
2262 // Go through each of the transactions
2263 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2264 TransactionStatus *status = iter->next()->getValue();
2266 // Check if the transaction is dead
2267 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2268 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2271 status->setStatus(TransactionStatus_StatusCommitted);
2280 * Process this slot, entry by entry-> Also update the latest message sent by slot
2282 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2284 // Update the last message seen
2285 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2287 // Process each entry in the slot
2288 Vector<Entry *> *entries = slot->getEntries();
2289 uint eSize = entries->size();
2290 for(uint ei=0; ei < eSize; ei++) {
2291 Entry * entry = entries->get(ei);
2292 switch (entry->getType()) {
2293 case TypeCommitPart:
2294 processEntry((CommitPart *)entry);
2297 processEntry((Abort *)entry);
2299 case TypeTransactionPart:
2300 processEntry((TransactionPart *)entry);
2303 processEntry((NewKey *)entry);
2305 case TypeLastMessage:
2306 processEntry((LastMessage *)entry, machineSet);
2308 case TypeRejectedMessage:
2309 processEntry((RejectedMessage *)entry, indexer);
2311 case TypeTableStatus:
2312 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2315 throw new Error("Unrecognized type: ");
2321 * Update the last message that was sent for a machine Id
2323 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2324 // Update what the last message received by a machine was
2325 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2329 * Add the new key to the arbitrators table and update the set of live
2330 * new keys (in case of a rescued new key message)
2332 void Table::processEntry(NewKey *entry) {
2333 // Update the arbitrator table with the new key information
2334 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2336 // Update what the latest live new key is
2337 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2338 if (oldNewKey != NULL) {
2339 // Delete the old new key messages
2340 oldNewKey->setDead();
2345 * Process new table status entries and set dead the old ones as new
2346 * ones come in-> keeps track of the largest and smallest table status
2347 * seen in this current round of updating the local copy of the block
2350 void Table::processEntry(TableStatus entry, int64_t seq) {
2351 int newNumSlots = entry->getMaxSlots();
2352 updateCurrMaxSize(newNumSlots);
2353 initExpectedSize(seq, newNumSlots);
2355 if (liveTableStatus != NULL) {
2356 // We have a larger table status so the old table status is no
2358 liveTableStatus->setDead();
2361 // Make this new table status the latest alive table status
2362 liveTableStatus = entry;
2366 * Check old messages to see if there is a block chain violation->
2369 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2370 int64_t oldSeqNum = entry->getOldSeqNum();
2371 int64_t newSeqNum = entry->getNewSeqNum();
2372 bool isequal = entry->getEqual();
2373 int64_t machineId = entry->getMachineID();
2374 int64_t seq = entry->getSequenceNumber();
2376 // Check if we have messages that were supposed to be rejected in
2377 // our local block chain
2378 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2380 Slot *slot = indexer->getSlot(seqNum);
2383 // If we have this slot make sure that it was not supposed to be
2385 int64_t slotMachineId = slot->getMachineID();
2386 if (isequal != (slotMachineId == machineId)) {
2387 throw new Error("Server Error: Trying to insert rejected message for slot ");
2392 // Create a list of clients to watch until they see this rejected
2394 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2395 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2396 // Machine ID for the last message entry
2397 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2399 // We've seen it, don't need to continue to watch-> Our next
2400 // message will implicitly acknowledge it->
2401 if (lastMessageEntryMachineId == localMachineId) {
2405 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2406 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2408 if (entrySequenceNumber < seq) {
2409 // Add this rejected message to the set of messages that this
2410 // machine ID did not see yet
2411 addWatchVector(lastMessageEntryMachineId, entry);
2412 // This client did not see this rejected message yet so add it
2413 // to the watch set to monitor
2414 deviceWatchSet->add(lastMessageEntryMachineId);
2417 if (deviceWatchSet->isEmpty()) {
2418 // This rejected message has been seen by all the clients so
2421 // We need to watch this rejected message
2422 entry->setWatchSet(deviceWatchSet);
2427 * Check if this abort is live, if not then save it so we can kill it
2428 * later-> update the last transaction number that was arbitrated on->
2430 void Table::processEntry(Abort *entry) {
2431 if (entry->getTransactionSequenceNumber() != -1) {
2432 // update the transaction status if it was sent to the server
2433 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2434 if (status != NULL) {
2435 status->setStatus(TransactionStatus_StatusAborted);
2439 // Abort has not been seen by the client it is for yet so we need to
2441 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2442 if (previouslySeenAbort != NULL) {
2443 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2446 if (entry->getTransactionArbitrator() == localMachineId) {
2447 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2450 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2451 // The machine already saw this so it is dead
2453 liveAbortTable->remove(&entry->getAbortId());
2455 if (entry->getTransactionArbitrator() == localMachineId) {
2456 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2461 // Update the last arbitration data that we have seen so far
2462 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2463 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2464 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2466 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2469 // Never seen any data from this arbitrator so record the first one
2470 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2473 // Set dead a transaction if we can
2474 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2475 if (transactionToSetDead != NULL) {
2476 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2479 // Update the last transaction sequence number that the arbitrator
2481 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2482 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2484 if (entry->getTransactionSequenceNumber() != -1) {
2485 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2491 * Set dead the transaction part if that transaction is dead and keep
2492 * track of all new parts
2494 void Table::processEntry(TransactionPart *entry) {
2495 // Check if we have already seen this transaction and set it dead OR
2496 // if it is not alive
2497 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2498 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2499 // This transaction is dead, it was already committed or aborted
2504 // This part is still alive
2505 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2507 if (transactionPart == NULL) {
2508 // Dont have a table for this machine Id yet so make one
2509 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2510 newTransactionParts->put(entry->getMachineId(), transactionPart);
2513 // Update the part and set dead ones we have already seen (got a
2515 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2516 if (previouslySeenPart != NULL) {
2517 previouslySeenPart->setDead();
2522 * Process new commit entries and save them for future use-> Delete duplicates
2524 void Table::processEntry(CommitPart *entry) {
2525 // Update the last transaction that was updated if we can
2526 if (entry->getTransactionSequenceNumber() != -1) {
2527 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2528 // Update the last transaction sequence number that the arbitrator
2530 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2531 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2535 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2536 if (commitPart == NULL) {
2537 // Don't have a table for this machine Id yet so make one
2538 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2539 newCommitParts->put(entry->getMachineId(), commitPart);
2541 // Update the part and set dead ones we have already seen (got a
2543 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2544 if (previouslySeenPart != NULL) {
2545 previouslySeenPart->setDead();
2550 * Update the last message seen table-> Update and set dead the
2551 * appropriate RejectedMessages as clients see them-> Updates the live
2552 * aborts, removes those that are dead and sets them dead-> Check that
2553 * the last message seen is correct and that there is no mismatch of
2554 * our own last message or that other clients have not had a rollback
2555 * on the last message->
2557 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2558 // We have seen this machine ID
2559 machineSet->remove(machineId);
2561 // Get the set of rejected messages that this machine Id is has not seen yet
2562 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2563 // If there is a rejected message that this machine Id has not seen yet
2564 if (watchset != NULL) {
2565 // Go through each rejected message that this machine Id has not
2568 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2569 while(rmit->hasNext()) {
2570 RejectedMessage *rm = rmit->next();
2571 // If this machine Id has seen this rejected message->->->
2572 if (rm->getSequenceNumber() <= seqNum) {
2573 // Remove it from our watchlist
2575 // Decrement machines that need to see this notification
2576 rm->removeWatcher(machineId);
2582 // Set dead the abort
2583 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2584 Abort *abort = i->next()->getValue();
2585 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2588 if (abort->getTransactionArbitrator() == localMachineId) {
2589 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2593 if (machineId == localMachineId) {
2594 // Our own messages are immediately dead->
2595 char livenessType = liveness->getType();
2596 if (livenessType==TypeLastMessage) {
2597 ((LastMessage *)liveness)->setDead();
2598 } else if (livenessType == TypeSlot) {
2599 ((Slot *)liveness)->setDead();
2601 throw new Error("Unrecognized type");
2604 // Get the old last message for this device
2605 Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2606 if (lastMessageEntry == NULL) {
2607 // If no last message then there is nothing else to process
2611 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2612 Liveness *lastEntry = lastMessageEntry->getSecond();
2613 delete lastMessageEntry;
2615 // If it is not our machine Id since we already set ours to dead
2616 if (machineId != localMachineId) {
2617 char lastEntryType = lastEntry->getType();
2619 if (lastEntryType == TypeLastMessage) {
2620 ((LastMessage *)lastEntry)->setDead();
2621 } else if (lastEntryType == TypeSlot) {
2622 ((Slot *)lastEntry)->setDead();
2624 throw new Error("Unrecognized type");
2627 // Make sure the server is not playing any games
2628 if (machineId == localMachineId) {
2629 if (hadPartialSendToServer) {
2630 // We were not making any updates and we had a machine mismatch
2631 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2632 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2635 // We were not making any updates and we had a machine mismatch
2636 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2637 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2641 if (lastMessageSeqNum > seqNum) {
2642 throw new Error("Server Error: Rollback on remote machine sequence number");
2648 * Add a rejected message entry to the watch set to keep track of
2649 * which clients have seen that rejected message entry and which have
2652 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2653 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2654 if (entries == NULL) {
2655 // There is no set for this machine ID yet so create one
2656 entries = new Hashset<RejectedMessage *>();
2657 rejectedMessageWatchVectorTable->put(machineId, entries);
2659 entries->add(entry);
2663 * Check if the HMAC chain is not violated
2665 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2666 for (int i = 0; i < newSlots->length(); i++) {
2667 Slot *currSlot = newSlots->get(i);
2668 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2669 if (prevSlot != NULL &&
2670 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2671 throw new Error("Server Error: Invalid HMAC Chain");