3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void * a, const void *b) {
25 const int64_t * pa = (const int64_t *) a;
26 const int64_t * pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localTransactionSequenceNumber(0),
50 lastTransactionSequenceNumberSpeculatedOn(0),
51 oldestTransactionSequenceNumberSpeculatedOn(0),
52 localArbitrationSequenceNumber(0),
53 hadPartialSendToServer(false),
54 attemptedToSendToServer(false),
56 didFindTableStatus(false),
58 lastSlotAttemptedToSend(NULL),
61 lastTransactionPartsSent(NULL),
62 lastPendingSendArbitrationEntriesToDelete(NULL),
64 committedKeyValueTable(NULL),
65 speculatedKeyValueTable(NULL),
66 pendingTransactionSpeculatedKeyValueTable(NULL),
67 liveNewKeyTable(NULL),
68 lastMessageTable(NULL),
69 rejectedMessageWatchVectorTable(NULL),
70 arbitratorTable(NULL),
72 newTransactionParts(NULL),
74 lastArbitratedTransactionNumberByArbitratorTable(NULL),
75 liveTransactionBySequenceNumberTable(NULL),
76 liveTransactionByTransactionIdTable(NULL),
77 liveCommitsTable(NULL),
78 liveCommitsByKeyTable(NULL),
79 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
80 rejectedSlotVector(NULL),
81 pendingTransactionQueue(NULL),
82 pendingSendArbitrationRounds(NULL),
83 pendingSendArbitrationEntriesToDelete(NULL),
84 transactionPartsSent(NULL),
85 outstandingTransactionStatus(NULL),
86 liveAbortsGeneratedByLocal(NULL),
87 offlineTransactionsCommittedAndAtServer(NULL),
88 localCommunicationTable(NULL),
89 lastTransactionSeenFromMachineFromServer(NULL),
90 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
91 lastInsertedNewKey(false),
97 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
101 liveTableStatus(NULL),
102 pendingTransactionBuilder(NULL),
103 lastPendingTransactionSpeculatedOn(NULL),
104 firstPendingTransaction(NULL),
106 bufferResizeThreshold(0),
108 oldestLiveSlotSequenceNumver(1),
109 localMachineId(_localMachineId),
111 localTransactionSequenceNumber(0),
112 lastTransactionSequenceNumberSpeculatedOn(0),
113 oldestTransactionSequenceNumberSpeculatedOn(0),
114 localArbitrationSequenceNumber(0),
115 hadPartialSendToServer(false),
116 attemptedToSendToServer(false),
118 didFindTableStatus(false),
120 lastSlotAttemptedToSend(NULL),
123 lastTransactionPartsSent(NULL),
124 lastPendingSendArbitrationEntriesToDelete(NULL),
126 committedKeyValueTable(NULL),
127 speculatedKeyValueTable(NULL),
128 pendingTransactionSpeculatedKeyValueTable(NULL),
129 liveNewKeyTable(NULL),
130 lastMessageTable(NULL),
131 rejectedMessageWatchVectorTable(NULL),
132 arbitratorTable(NULL),
133 liveAbortTable(NULL),
134 newTransactionParts(NULL),
135 newCommitParts(NULL),
136 lastArbitratedTransactionNumberByArbitratorTable(NULL),
137 liveTransactionBySequenceNumberTable(NULL),
138 liveTransactionByTransactionIdTable(NULL),
139 liveCommitsTable(NULL),
140 liveCommitsByKeyTable(NULL),
141 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
142 rejectedSlotVector(NULL),
143 pendingTransactionQueue(NULL),
144 pendingSendArbitrationRounds(NULL),
145 pendingSendArbitrationEntriesToDelete(NULL),
146 transactionPartsSent(NULL),
147 outstandingTransactionStatus(NULL),
148 liveAbortsGeneratedByLocal(NULL),
149 offlineTransactionsCommittedAndAtServer(NULL),
150 localCommunicationTable(NULL),
151 lastTransactionSeenFromMachineFromServer(NULL),
152 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
153 lastInsertedNewKey(false),
160 * Init all the stuff needed for for table usage
163 // Init helper objects
164 random = new Random();
165 buffer = new SlotBuffer();
168 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
169 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
170 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
171 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
172 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
173 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
174 arbitratorTable = new Hashtable<IoTString *, int64_t>();
175 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
176 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
177 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
178 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
179 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
180 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
181 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
182 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
183 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
184 rejectedSlotVector = new Vector<int64_t>();
185 pendingTransactionQueue = new Vector<Transaction *>();
186 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
187 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
188 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
189 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
190 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
191 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
192 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
193 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
194 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
197 numberOfSlots = buffer->capacity();
198 setResizeThreshold();
202 * Initialize the table by inserting a table status as the first entry
203 * into the table status also initialize the crypto stuff.
205 void Table::initTable() {
206 cloud->initSecurity();
208 // Create the first insertion into the block chain which is the table status
209 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
210 localSequenceNumber++;
211 TableStatus *status = new TableStatus(s, numberOfSlots);
213 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
216 array = new Array<Slot *>(1);
218 // update local block chain
219 validateAndUpdate(array, true);
220 } else if (array->length() == 1) {
221 // in case we did push the slot BUT we failed to init it
222 validateAndUpdate(array, true);
224 throw new Error("Error on initialization");
229 * Rebuild the table from scratch by pulling the latest block chain
232 void Table::rebuild() {
233 // Just pull the latest slots from the server
234 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
235 validateAndUpdate(newslots, true);
237 updateLiveTransactionsAndStatus();
240 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
241 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
244 int64_t Table::getArbitrator(IoTString *key) {
245 return arbitratorTable->get(key);
248 void Table::close() {
252 IoTString *Table::getCommitted(IoTString *key) {
253 KeyValue *kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getSpeculative(IoTString *key) {
263 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
266 kv = speculatedKeyValueTable->get(key);
270 kv = committedKeyValueTable->get(key);
274 return kv->getValue();
280 IoTString *Table::getCommittedAtomic(IoTString *key) {
281 KeyValue *kv = committedKeyValueTable->get(key);
283 if (!arbitratorTable->contains(key)) {
284 throw new Error("Key not Found.");
287 // Make sure new key value pair matches the current arbitrator
288 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
289 // TODO: Maybe not throw en error
290 throw new Error("Not all Key Values Match Arbitrator.");
294 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
295 return kv->getValue();
297 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
302 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
303 if (!arbitratorTable->contains(key)) {
304 throw new Error("Key not Found.");
307 // Make sure new key value pair matches the current arbitrator
308 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
309 // TODO: Maybe not throw en error
310 throw new Error("Not all Key Values Match Arbitrator.");
313 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
316 kv = speculatedKeyValueTable->get(key);
320 kv = committedKeyValueTable->get(key);
324 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
325 return kv->getValue();
327 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
332 bool Table::update() {
334 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
335 validateAndUpdate(newSlots, false);
337 updateLiveTransactionsAndStatus();
339 } catch (Exception *e) {
340 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
341 while (kit->hasNext()) {
342 int64_t m = kit->next();
351 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
353 if (!arbitratorTable->contains(keyName)) {
354 // There is already an arbitrator
357 NewKey *newKey = new NewKey(NULL, keyName, machineId);
359 if (sendToServer(newKey)) {
360 // If successfully inserted
366 void Table::startTransaction() {
367 // Create a new transaction, invalidates any old pending transactions.
368 pendingTransactionBuilder = new PendingTransaction(localMachineId);
371 void Table::addKV(IoTString *key, IoTString *value) {
373 // Make sure it is a valid key
374 if (!arbitratorTable->contains(key)) {
375 throw new Error("Key not Found.");
378 // Make sure new key value pair matches the current arbitrator
379 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
380 // TODO: Maybe not throw en error
381 throw new Error("Not all Key Values Match Arbitrator.");
384 // Add the key value to this transaction
385 KeyValue *kv = new KeyValue(key, value);
386 pendingTransactionBuilder->addKV(kv);
389 TransactionStatus *Table::commitTransaction() {
390 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
391 // transaction with no updates will have no effect on the system
392 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
395 // Set the local transaction sequence number and increment
396 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
397 localTransactionSequenceNumber++;
399 // Create the transaction status
400 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
402 // Create the new transaction
403 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
404 newTransaction->setTransactionStatus(transactionStatus);
406 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
407 // Add it to the queue and invalidate the builder for safety
408 pendingTransactionQueue->add(newTransaction);
410 arbitrateOnLocalTransaction(newTransaction);
411 updateLiveStateFromLocal();
414 pendingTransactionBuilder = new PendingTransaction(localMachineId);
418 } catch (ServerException *e) {
420 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
421 uint size = pendingTransactionQueue->size();
423 for (int iter = 0; iter < size; iter++) {
424 Transaction *transaction = pendingTransactionQueue->get(iter);
425 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
427 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
428 // Already contacted this client so ignore all attempts to contact this client
429 // to preserve ordering for arbitrator
433 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
435 if (sendReturn.getFirst()) {
436 // Failed to contact over local
437 arbitratorTriedAndFailed->add(transaction->getArbitrator());
439 // Successful contact or should not contact
441 if (sendReturn.getSecond()) {
447 pendingTransactionQueue->setSize(oldindex);
450 updateLiveStateFromLocal();
452 return transactionStatus;
456 * Recalculate the new resize threshold
458 void Table::setResizeThreshold() {
459 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
460 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
463 int64_t Table::getLocalSequenceNumber() {
464 return localSequenceNumber;
467 bool Table::sendToServer(NewKey *newKey) {
468 bool fromRetry = false;
470 if (hadPartialSendToServer) {
471 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
472 if (newSlots->length() == 0) {
474 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
476 if (sendSlotsReturn.getFirst()) {
477 if (newKey != NULL) {
478 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
483 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
484 while (trit->hasNext()) {
485 Transaction *transaction = trit->next();
486 transaction->resetServerFailure();
487 // Update which transactions parts still need to be sent
488 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
489 // Add the transaction status to the outstanding list
490 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
492 // Update the transaction status
493 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
495 // Check if all the transaction parts were successfully
496 // sent and if so then remove it from pending
497 if (transaction->didSendAllParts()) {
498 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
499 pendingTransactionQueue->remove(transaction);
504 newSlots = sendSlotsReturn.getThird();
505 bool isInserted = false;
506 for (uint si = 0; si < newSlots->length(); si++) {
507 Slot *s = newSlots->get(si);
508 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
514 for (uint si = 0; si < newSlots->length(); si++) {
515 Slot *s = newSlots->get(si);
520 // Process each entry in the slot
521 Vector<Entry *> *ventries = s->getEntries();
522 uint vesize = ventries->size();
523 for (uint vei = 0; vei < vesize; vei++) {
524 Entry *entry = ventries->get(vei);
525 if (entry->getType() == TypeLastMessage) {
526 LastMessage *lastMessage = (LastMessage *)entry;
527 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
536 if (newKey != NULL) {
537 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
542 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
543 while (trit->hasNext()) {
544 Transaction *transaction = trit->next();
545 transaction->resetServerFailure();
547 // Update which transactions parts still need to be sent
548 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
550 // Add the transaction status to the outstanding list
551 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
553 // Update the transaction status
554 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
556 // Check if all the transaction parts were successfully sent and if so then remove it from pending
557 if (transaction->didSendAllParts()) {
558 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
559 pendingTransactionQueue->remove(transaction);
561 transaction->resetServerFailure();
562 // Set the transaction sequence number back to nothing
563 if (!transaction->didSendAPartToServer()) {
564 transaction->setSequenceNumber(-1);
572 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
573 while (trit->hasNext()) {
574 Transaction *transaction = trit->next();
575 transaction->resetServerFailure();
576 // Set the transaction sequence number back to nothing
577 if (!transaction->didSendAPartToServer()) {
578 transaction->setSequenceNumber(-1);
583 if (sendSlotsReturn.getThird()->length() != 0) {
584 // insert into the local block chain
585 validateAndUpdate(sendSlotsReturn.getThird(), true);
589 bool isInserted = false;
590 for (uint si = 0; si < newSlots->length(); si++) {
591 Slot *s = newSlots->get(si);
592 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
598 for (uint si = 0; si < newSlots->length(); si++) {
599 Slot *s = newSlots->get(si);
604 // Process each entry in the slot
605 Vector<Entry *> *entries = s->getEntries();
606 uint eSize = entries->size();
607 for(uint ei=0; ei < eSize; ei++) {
608 Entry * entry = entries->get(ei);
610 if (entry->getType() == TypeLastMessage) {
611 LastMessage *lastMessage = (LastMessage *)entry;
612 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
621 if (newKey != NULL) {
622 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
627 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
628 while (trit->hasNext()) {
629 Transaction *transaction = trit->next();
630 transaction->resetServerFailure();
632 // Update which transactions parts still need to be sent
633 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
635 // Add the transaction status to the outstanding list
636 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
638 // Update the transaction status
639 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
641 // Check if all the transaction parts were successfully sent and if so then remove it from pending
642 if (transaction->didSendAllParts()) {
643 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
644 pendingTransactionQueue->remove(transaction);
646 transaction->resetServerFailure();
647 // Set the transaction sequence number back to nothing
648 if (!transaction->didSendAPartToServer()) {
649 transaction->setSequenceNumber(-1);
655 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
656 while (trit->hasNext()) {
657 Transaction *transaction = trit->next();
658 transaction->resetServerFailure();
659 // Set the transaction sequence number back to nothing
660 if (!transaction->didSendAPartToServer()) {
661 transaction->setSequenceNumber(-1);
667 // insert into the local block chain
668 validateAndUpdate(newSlots, true);
671 } catch (ServerException *e) {
678 // While we have stuff that needs inserting into the block chain
679 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
683 if (hadPartialSendToServer) {
684 throw new Error("Should Be error free");
689 // If there is a new key with same name then end
690 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
695 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
696 localSequenceNumber++;
698 // Try to fill the slot with data
699 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
700 bool needsResize = fillSlotsReturn.getFirst();
701 int newSize = fillSlotsReturn.getSecond();
702 bool insertedNewKey = fillSlotsReturn.getThird();
705 // Reset which transaction to send
706 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
707 while (trit->hasNext()) {
708 Transaction *transaction = trit->next();
709 transaction->resetNextPartToSend();
711 // Set the transaction sequence number back to nothing
712 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
713 transaction->setSequenceNumber(-1);
718 // Clear the sent data since we are trying again
719 pendingSendArbitrationEntriesToDelete->clear();
720 transactionPartsSent->clear();
722 // We needed a resize so try again
723 fillSlot(slot, true, newKey);
726 lastSlotAttemptedToSend = slot;
727 lastIsNewKey = (newKey != NULL);
728 lastInsertedNewKey = insertedNewKey;
729 lastNewSize = newSize;
731 lastTransactionPartsSent = transactionPartsSent->clone();
732 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
734 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
736 if (sendSlotsReturn.getFirst()) {
738 // Did insert into the block chain
740 if (insertedNewKey) {
741 // This slot was what was inserted not a previous slot
743 // New Key was successfully inserted into the block chain so dont want to insert it again
747 // Remove the aborts and commit parts that were sent from the pending to send queue
748 uint size = pendingSendArbitrationRounds->size();
750 for (uint i = 0; i < size; i++) {
751 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
752 round->removeParts(pendingSendArbitrationEntriesToDelete);
754 if (!round->isDoneSending()) {
755 // Sent all the parts
756 pendingSendArbitrationRounds->set(oldcount++,
757 pendingSendArbitrationRounds->get(i));
760 pendingSendArbitrationRounds->setSize(oldcount);
762 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
763 while (trit->hasNext()) {
764 Transaction *transaction = trit->next();
765 transaction->resetServerFailure();
767 // Update which transactions parts still need to be sent
768 transaction->removeSentParts(transactionPartsSent->get(transaction));
770 // Add the transaction status to the outstanding list
771 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
773 // Update the transaction status
774 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
776 // Check if all the transaction parts were successfully sent and if so then remove it from pending
777 if (transaction->didSendAllParts()) {
778 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
779 pendingTransactionQueue->remove(transaction);
784 // Reset which transaction to send
785 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
786 while (trit->hasNext()) {
787 Transaction *transaction = trit->next();
788 transaction->resetNextPartToSend();
790 // Set the transaction sequence number back to nothing
791 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
792 transaction->setSequenceNumber(-1);
798 // Clear the sent data in preparation for next send
799 pendingSendArbitrationEntriesToDelete->clear();
800 transactionPartsSent->clear();
802 if (sendSlotsReturn.getThird()->length() != 0) {
803 // insert into the local block chain
804 validateAndUpdate(sendSlotsReturn.getThird(), true);
808 } catch (ServerException *e) {
809 if (e->getType() != ServerException_TypeInputTimeout) {
810 // Nothing was able to be sent to the server so just clear these data structures
811 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
812 while (trit->hasNext()) {
813 Transaction *transaction = trit->next();
814 transaction->resetNextPartToSend();
816 // Set the transaction sequence number back to nothing
817 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
818 transaction->setSequenceNumber(-1);
823 // There was a partial send to the server
824 hadPartialSendToServer = true;
826 // Nothing was able to be sent to the server so just clear these data structures
827 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
828 while (trit->hasNext()) {
829 Transaction *transaction = trit->next();
830 transaction->resetNextPartToSend();
831 transaction->setServerFailure();
836 pendingSendArbitrationEntriesToDelete->clear();
837 transactionPartsSent->clear();
842 return newKey == NULL;
845 bool Table::updateFromLocal(int64_t machineId) {
846 if (!localCommunicationTable->contains(machineId))
849 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(machineId);
851 // Get the size of the send data
852 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
855 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
856 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
859 Array<char> *sendData = new Array<char>(sendDataSize);
860 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
863 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
867 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868 localSequenceNumber++;
870 if (returnData == NULL) {
871 // Could not contact server
876 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877 int numberOfEntries = bbDecode->getInt();
879 for (int i = 0; i < numberOfEntries; i++) {
880 char type = bbDecode->get();
881 if (type == TypeAbort) {
882 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
884 } else if (type == TypeCommitPart) {
885 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
886 processEntry(commitPart);
890 updateLiveStateFromLocal();
895 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
897 // Get the devices local communications
898 if (!localCommunicationTable->contains(transaction->getArbitrator()))
899 return Pair<bool, bool>(true, false);
901 Pair<IoTString *, int32_t> * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
903 // Get the size of the send data
904 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
906 Vector<TransactionPart *> * tParts = transaction->getParts();
907 uint tPartsSize = tParts->size();
908 for (uint i = 0; i < tPartsSize; i++) {
909 TransactionPart * part = tParts->get(i);
910 sendDataSize += part->getSize();
914 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
915 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
916 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
919 // Make the send data size
920 Array<char> *sendData = new Array<char>(sendDataSize);
921 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
924 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
925 bbEncode->putInt(transaction->getParts()->size());
927 Vector<TransactionPart *> * tParts = transaction->getParts();
928 uint tPartsSize = tParts->size();
929 for (uint i = 0; i < tPartsSize; i++) {
930 TransactionPart * part = tParts->get(i);
931 part->encode(bbEncode);
936 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
937 localSequenceNumber++;
939 if (returnData == NULL) {
940 // Could not contact server
941 return Pair<bool, bool>(true, false);
945 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
946 bool didCommit = bbDecode->get() == 1;
947 bool couldArbitrate = bbDecode->get() == 1;
948 int numberOfEntries = bbDecode->getInt();
949 bool foundAbort = false;
951 for (int i = 0; i < numberOfEntries; i++) {
952 char type = bbDecode->get();
953 if (type == TypeAbort) {
954 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
956 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
961 } else if (type == TypeCommitPart) {
962 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
963 processEntry(commitPart);
967 updateLiveStateFromLocal();
969 if (couldArbitrate) {
970 TransactionStatus * status = transaction->getTransactionStatus();
972 status->setStatus(TransactionStatus_StatusCommitted);
974 status->setStatus(TransactionStatus_StatusAborted);
977 TransactionStatus * status = transaction->getTransactionStatus();
979 status->setStatus(TransactionStatus_StatusAborted);
981 status->setStatus(TransactionStatus_StatusCommitted);
985 return Pair<bool, bool>(false, true);
988 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
990 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
991 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
992 int numberOfParts = bbDecode->getInt();
994 // If we did commit a transaction or not
995 bool didCommit = false;
996 bool couldArbitrate = false;
998 if (numberOfParts != 0) {
1000 // decode the transaction
1001 Transaction *transaction = new Transaction();
1002 for (int i = 0; i < numberOfParts; i++) {
1004 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1005 transaction->addPartDecode(newPart);
1008 // Arbitrate on transaction and pull relevant return data
1009 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1010 couldArbitrate = localArbitrateReturn.getFirst();
1011 didCommit = localArbitrateReturn.getSecond();
1013 updateLiveStateFromLocal();
1015 // Transaction was sent to the server so keep track of it to prevent double commit
1016 if (transaction->getSequenceNumber() != -1) {
1017 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1021 // The data to send back
1022 int returnDataSize = 0;
1023 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1025 // Get the aborts to send back
1026 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1028 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1029 while(abortit->hasNext())
1030 abortLocalSequenceNumbers->add(abortit->next());
1034 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1036 uint asize = abortLocalSequenceNumbers->size();
1037 for(uint i=0; i<asize; i++) {
1038 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1039 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1043 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1044 unseenArbitrations->add(abort);
1045 returnDataSize += abort->getSize();
1048 // Get the commits to send back
1049 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1050 if (commitForClientTable != NULL) {
1051 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1053 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1054 while(commitit->hasNext())
1055 commitLocalSequenceNumbers->add(commitit->next());
1058 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1060 uint clsSize = commitLocalSequenceNumbers->size();
1061 for(uint clsi = 0; clsi < clsSize; clsi++) {
1062 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1063 Commit *commit = commitForClientTable->get(localSequenceNumber);
1065 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1070 Vector<CommitPart *> * parts = commit->getParts();
1071 uint nParts = parts->size();
1072 for(uint i=0; i<nParts; i++) {
1073 CommitPart * commitPart = parts->get(i);
1074 unseenArbitrations->add(commitPart);
1075 returnDataSize += commitPart->getSize();
1081 // Number of arbitration entries to decode
1082 returnDataSize += 2 * sizeof(int32_t);
1084 // bool of did commit or not
1085 if (numberOfParts != 0) {
1086 returnDataSize += sizeof(char);
1089 // Data to send Back
1090 Array<char> *returnData = new Array<char>(returnDataSize);
1091 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1093 if (numberOfParts != 0) {
1095 bbEncode->put((char)1);
1097 bbEncode->put((char)0);
1099 if (couldArbitrate) {
1100 bbEncode->put((char)1);
1102 bbEncode->put((char)0);
1106 bbEncode->putInt(unseenArbitrations->size());
1107 uint size = unseenArbitrations->size();
1108 for (uint i = 0; i < size; i++) {
1109 Entry *entry = unseenArbitrations->get(i);
1110 entry->encode(bbEncode);
1113 localSequenceNumber++;
1117 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1118 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1119 attemptedToSendToServer = true;
1121 bool inserted = false;
1122 bool lastTryInserted = false;
1124 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1125 if (array == NULL) {
1126 array = new Array<Slot *>();
1127 array->set(0, slot);
1128 rejectedSlotVector->clear();
1131 if (array->length() == 0) {
1132 throw new Error("Server Error: Did not send any slots");
1135 // if (attemptedToSendToServerTmp) {
1136 if (hadPartialSendToServer) {
1138 bool isInserted = false;
1139 uint size = array->length();
1140 for (uint i = 0; i < size; i++) {
1141 Slot *s = array->get(i);
1142 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1148 for (uint i = 0; i < size; i++) {
1149 Slot *s = array->get(i);
1154 // Process each entry in the slot
1155 Vector<Entry *> *entries = s->getEntries();
1156 uint eSize = entries->size();
1157 for(uint ei=0; ei < eSize; ei++) {
1158 Entry * entry = entries->get(ei);
1160 if (entry->getType() == TypeLastMessage) {
1161 LastMessage *lastMessage = (LastMessage *)entry;
1163 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1172 rejectedSlotVector->add(slot->getSequenceNumber());
1173 lastTryInserted = false;
1175 lastTryInserted = true;
1178 rejectedSlotVector->add(slot->getSequenceNumber());
1179 lastTryInserted = false;
1183 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1187 * Returns false if a resize was needed
1189 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1191 if (liveSlotCount > bufferResizeThreshold) {
1192 resize = true;//Resize is forced
1196 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1197 TableStatus *status = new TableStatus(slot, newSize);
1198 slot->addEntry(status);
1201 // Fill with rejected slots first before doing anything else
1202 doRejectedMessages(slot);
1204 // Do mandatory rescue of entries
1205 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1207 // Extract working variables
1208 bool needsResize = mandatoryRescueReturn.getFirst();
1209 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1210 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1212 if (needsResize && !resize) {
1213 // We need to resize but we are not resizing so return false
1214 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1217 bool inserted = false;
1218 if (newKeyEntry != NULL) {
1219 newKeyEntry->setSlot(slot);
1220 if (slot->hasSpace(newKeyEntry)) {
1221 slot->addEntry(newKeyEntry);
1226 // Clear the transactions, aborts and commits that were sent previously
1227 transactionPartsSent->clear();
1228 pendingSendArbitrationEntriesToDelete->clear();
1229 uint size = pendingSendArbitrationRounds->size();
1230 for (uint i = 0; i < size; i++) {
1231 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1232 bool isFull = false;
1233 round->generateParts();
1234 Vector<Entry *> *parts = round->getParts();
1236 // Insert pending arbitration data
1237 uint vsize = parts->size();
1238 for (uint vi = 0; vi < vsize; vi++) {
1239 Entry *arbitrationData = parts->get(vi);
1241 // If it is an abort then we need to set some information
1242 if (arbitrationData->getType() == TypeAbort) {
1243 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1246 if (!slot->hasSpace(arbitrationData)) {
1247 // No space so cant do anything else with these data entries
1252 // Add to this current slot and add it to entries to delete
1253 slot->addEntry(arbitrationData);
1254 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1262 if (pendingTransactionQueue->size() > 0) {
1263 Transaction *transaction = pendingTransactionQueue->get(0);
1264 // Set the transaction sequence number if it has yet to be inserted into the block chain
1265 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1266 transaction->setSequenceNumber(slot->getSequenceNumber());
1270 TransactionPart *part = transaction->getNextPartToSend();
1272 // Ran out of parts to send for this transaction so move on
1276 if (slot->hasSpace(part)) {
1277 slot->addEntry(part);
1278 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1279 if (partsSent == NULL) {
1280 partsSent = new Vector<int32_t>();
1281 transactionPartsSent->put(transaction, partsSent);
1283 partsSent->add(part->getPartNumber());
1284 transactionPartsSent->put(transaction, partsSent);
1291 // Fill the remainder of the slot with rescue data
1292 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1294 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1297 void Table::doRejectedMessages(Slot *s) {
1298 if (!rejectedSlotVector->isEmpty()) {
1299 /* TODO: We should avoid generating a rejected message entry if
1300 * there is already a sufficient entry in the queue (e->g->,
1301 * equalsto value of true and same sequence number)-> */
1303 int64_t old_seqn = rejectedSlotVector->get(0);
1304 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1305 int64_t new_seqn = rejectedSlotVector->lastElement();
1306 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1309 int64_t prev_seqn = -1;
1311 /* Go through list of missing messages */
1312 for (; i < rejectedSlotVector->size(); i++) {
1313 int64_t curr_seqn = rejectedSlotVector->get(i);
1314 Slot *s_msg = buffer->getSlot(curr_seqn);
1317 prev_seqn = curr_seqn;
1319 /* Generate rejected message entry for missing messages */
1320 if (prev_seqn != -1) {
1321 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1324 /* Generate rejected message entries for present messages */
1325 for (; i < rejectedSlotVector->size(); i++) {
1326 int64_t curr_seqn = rejectedSlotVector->get(i);
1327 Slot *s_msg = buffer->getSlot(curr_seqn);
1328 int64_t machineid = s_msg->getMachineID();
1329 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1336 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1337 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1338 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1339 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1340 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1343 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1344 bool seenLiveSlot = false;
1345 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1346 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1350 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1351 Slot * previousSlot = buffer->getSlot(currentSequenceNumber);
1352 // Push slot number forward
1353 if (!seenLiveSlot) {
1354 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1357 if (!previousSlot->isLive()) {
1361 // We have seen a live slot
1362 seenLiveSlot = true;
1364 // Get all the live entries for a slot
1365 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1367 // Iterate over all the live entries and try to rescue them
1368 uint lESize = liveEntries->size();
1369 for (uint i=0; i< lESize; i++) {
1370 Entry * liveEntry = liveEntries->get(i);
1371 if (slot->hasSpace(liveEntry)) {
1372 // Enough space to rescue the entry
1373 slot->addEntry(liveEntry);
1374 } else if (currentSequenceNumber == firstIfFull) {
1375 //if there's no space but the entry is about to fall off the queue
1376 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1382 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1385 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1386 /* now go through live entries from least to greatest sequence number until
1387 * either all live slots added, or the slot doesn't have enough room
1388 * for SKIP_THRESHOLD consecutive entries*/
1390 int64_t newestseqnum = buffer->getNewestSeqNum();
1392 for (; seqn <= newestseqnum; seqn++) {
1393 Slot *prevslot = buffer->getSlot(seqn);
1394 //Push slot number forward
1396 oldestLiveSlotSequenceNumver = seqn;
1398 if (!prevslot->isLive())
1400 seenliveslot = true;
1401 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1402 uint lESize = liveentries->size();
1403 for (uint i=0; i< lESize; i++) {
1404 Entry * liveentry = liveentries->get(i);
1405 if (s->hasSpace(liveentry))
1406 s->addEntry(liveentry);
1409 if (skipcount > Table_SKIP_THRESHOLD)
1419 * Checks for malicious activity and updates the local copy of the block chain->
1421 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1422 // The cloud communication layer has checked slot HMACs already
1424 if (newSlots->length() == 0) {
1428 // Make sure all slots are newer than the last largest slot this
1430 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1431 if (firstSeqNum <= sequenceNumber) {
1432 throw new Error("Server Error: Sent older slots!");
1435 // Create an object that can access both new slots and slots in our
1436 // local chain without committing slots to our local chain
1437 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1439 // Check that the HMAC chain is not broken
1440 checkHMACChain(indexer, newSlots);
1442 // Set to keep track of messages from clients
1443 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1445 SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
1446 while(lmit->hasNext())
1447 machineSet->add(lmit->next());
1451 // Process each slots data
1453 uint numSlots = newSlots->length();
1454 for(uint i=0; i<numSlots; i++) {
1455 Slot *slot = newSlots->get(i);
1456 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1457 updateExpectedSize();
1461 // If there is a gap, check to see if the server sent us
1463 if (firstSeqNum != (sequenceNumber + 1)) {
1465 // Check the size of the slots that were sent down by the server->
1466 // Can only check the size if there was a gap
1467 checkNumSlots(newSlots->length());
1469 // Since there was a gap every machine must have pushed a slot or
1470 // must have a last message message-> If not then the server is
1472 if (!machineSet->isEmpty()) {
1473 throw new Error("Missing record for machines: ");
1477 // Update the size of our local block chain->
1480 // Commit new to slots to the local block chain->
1482 uint numSlots = newSlots->length();
1483 for(uint i=0; i<numSlots; i++) {
1484 Slot *slot = newSlots->get(i);
1486 // Insert this slot into our local block chain copy->
1487 buffer->putSlot(slot);
1489 // Keep track of how many slots are currently live (have live data
1494 // Get the sequence number of the latest slot in the system
1495 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1496 updateLiveStateFromServer();
1498 // No Need to remember after we pulled from the server
1499 offlineTransactionsCommittedAndAtServer->clear();
1501 // This is invalidated now
1502 hadPartialSendToServer = false;
1505 void Table::updateLiveStateFromServer() {
1506 // Process the new transaction parts
1507 processNewTransactionParts();
1509 // Do arbitration on new transactions that were received
1510 arbitrateFromServer();
1512 // Update all the committed keys
1513 bool didCommitOrSpeculate = updateCommittedTable();
1515 // Delete the transactions that are now dead
1516 updateLiveTransactionsAndStatus();
1519 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1520 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1523 void Table::updateLiveStateFromLocal() {
1524 // Update all the committed keys
1525 bool didCommitOrSpeculate = updateCommittedTable();
1527 // Delete the transactions that are now dead
1528 updateLiveTransactionsAndStatus();
1531 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1532 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1535 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1536 int64_t prevslots = firstSequenceNumber;
1538 if (didFindTableStatus) {
1540 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1543 didFindTableStatus = true;
1544 currMaxSize = numberOfSlots;
1547 void Table::updateExpectedSize() {
1550 if (expectedsize > currMaxSize) {
1551 expectedsize = currMaxSize;
1557 * Check the size of the block chain to make sure there are enough
1558 * slots sent back by the server-> This is only called when we have a
1559 * gap between the slots that we have locally and the slots sent by
1560 * the server therefore in the slots sent by the server there will be
1561 * at least 1 Table status message
1563 void Table::checkNumSlots(int numberOfSlots) {
1564 if (numberOfSlots != expectedsize) {
1565 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1570 * Update the size of of the local buffer if it is needed->
1572 void Table::commitNewMaxSize() {
1573 didFindTableStatus = false;
1575 // Resize the local slot buffer
1576 if (numberOfSlots != currMaxSize) {
1577 buffer->resize((int32_t)currMaxSize);
1580 // Change the number of local slots to the new size
1581 numberOfSlots = (int32_t)currMaxSize;
1583 // Recalculate the resize threshold since the size of the local
1584 // buffer has changed
1585 setResizeThreshold();
1589 * Process the new transaction parts from this latest round of slots
1590 * received from the server
1592 void Table::processNewTransactionParts() {
1594 if (newTransactionParts->size() == 0) {
1595 // Nothing new to process
1599 // Iterate through all the machine Ids that we received new parts
1601 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
1602 while(tpit->hasNext()) {
1603 int64_t machineId = tpit->next();
1604 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1606 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1607 // Iterate through all the parts for that machine Id
1608 while(ptit->hasNext()) {
1609 Pair<int64_t, int32_t> * partId = ptit->next();
1610 TransactionPart *part = parts->get(partId);
1612 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1613 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1614 if (lastTransactionNumber >= part->getSequenceNumber()) {
1615 // Set dead the transaction part
1621 // Get the transaction object for that sequence number
1622 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1624 if (transaction == NULL) {
1625 // This is a new transaction that we dont have so make a new one
1626 transaction = new Transaction();
1628 // Insert this new transaction into the live tables
1629 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1630 liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
1633 // Add that part to the transaction
1634 transaction->addPartDecode(part);
1639 // Clear all the new transaction parts in preparation for the next
1640 // time the server sends slots
1641 newTransactionParts->clear();
1644 void Table::arbitrateFromServer() {
1646 if (liveTransactionBySequenceNumberTable->size() == 0) {
1647 // Nothing to arbitrate on so move on
1651 // Get the transaction sequence numbers and sort from oldest to newest
1652 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1654 SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1655 while(trit->hasNext())
1656 transactionSequenceNumbers->add(trit->next());
1659 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1661 // Collection of key value pairs that are
1662 Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1664 // The last transaction arbitrated on
1665 int64_t lastTransactionCommitted = -1;
1666 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1667 uint tsnSize = transactionSequenceNumbers->size();
1668 for(uint i=0; i<tsnSize; i++) {
1669 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1670 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1672 // Check if this machine arbitrates for this transaction if not
1673 // then we cant arbitrate this transaction
1674 if (transaction->getArbitrator() != localMachineId) {
1678 if (transactionSequenceNumber < lastSeqNumArbOn) {
1682 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1683 // We have seen this already locally so dont commit again
1688 if (!transaction->isComplete()) {
1689 // Will arbitrate in incorrect order if we continue so just break
1695 // update the largest transaction seen by arbitrator from server
1696 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1697 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1699 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1700 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1701 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1705 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1706 // Guard evaluated as true
1708 // Update the local changes so we can make the commit
1709 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1710 while (kvit->hasNext()) {
1711 KeyValue *kv = kvit->next();
1712 speculativeTableTmp->put(kv->getKey(), kv);
1716 // Update what the last transaction committed was for use in batch commit
1717 lastTransactionCommitted = transactionSequenceNumber;
1719 // Guard evaluated was false so create abort
1721 Abort *newAbort = new Abort(NULL,
1722 transaction->getClientLocalSequenceNumber(),
1723 transaction->getSequenceNumber(),
1724 transaction->getMachineId(),
1725 transaction->getArbitrator(),
1726 localArbitrationSequenceNumber);
1727 localArbitrationSequenceNumber++;
1728 generatedAborts->add(newAbort);
1730 // Insert the abort so we can process
1731 processEntry(newAbort);
1734 lastSeqNumArbOn = transactionSequenceNumber;
1737 Commit *newCommit = NULL;
1739 // If there is something to commit
1740 if (speculativeTableTmp->size() != 0) {
1741 // Create the commit and increment the commit sequence number
1742 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1743 localArbitrationSequenceNumber++;
1745 // Add all the new keys to the commit
1746 SetIterator<IoTString *, KeyValue *> * spit = getKeyIterator(speculativeTableTmp);
1747 while(spit->hasNext()) {
1748 IoTString * string = spit->next();
1749 KeyValue * kv = speculativeTableTmp->get(string);
1750 newCommit->addKV(kv);
1754 // create the commit parts
1755 newCommit->createCommitParts();
1757 // Append all the commit parts to the end of the pending queue
1758 // waiting for sending to the server
1759 // Insert the commit so we can process it
1760 Vector<CommitPart *> * parts = newCommit->getParts();
1761 uint partsSize = parts->size();
1762 for(uint i=0; i<partsSize; i++) {
1763 CommitPart * commitPart = parts->get(i);
1764 processEntry(commitPart);
1768 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1769 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1770 pendingSendArbitrationRounds->add(arbitrationRound);
1772 if (compactArbitrationData()) {
1773 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1774 if (newArbitrationRound->getCommit() != NULL) {
1775 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1776 uint partsSize = parts->size();
1777 for(uint i=0; i<partsSize; i++) {
1778 CommitPart * commitPart = parts->get(i);
1779 processEntry(commitPart);
1786 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1788 // Check if this machine arbitrates for this transaction if not then
1789 // we cant arbitrate this transaction
1790 if (transaction->getArbitrator() != localMachineId) {
1791 return Pair<bool, bool>(false, false);
1794 if (!transaction->isComplete()) {
1795 // Will arbitrate in incorrect order if we continue so just break
1797 return Pair<bool, bool>(false, false);
1800 if (transaction->getMachineId() != localMachineId) {
1801 // dont do this check for local transactions
1802 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1803 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1804 // We've have already seen this from the server
1805 return Pair<bool, bool>(false, false);
1810 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1811 // Guard evaluated as true Create the commit and increment the
1812 // commit sequence number
1813 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1814 localArbitrationSequenceNumber++;
1816 // Update the local changes so we can make the commit
1817 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1818 while (kvit->hasNext()) {
1819 KeyValue *kv = kvit->next();
1820 newCommit->addKV(kv);
1824 // create the commit parts
1825 newCommit->createCommitParts();
1827 // Append all the commit parts to the end of the pending queue
1828 // waiting for sending to the server
1829 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1830 pendingSendArbitrationRounds->add(arbitrationRound);
1832 if (compactArbitrationData()) {
1833 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1834 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1835 uint partsSize = parts->size();
1836 for(uint i=0; i<partsSize; i++) {
1837 CommitPart * commitPart = parts->get(i);
1838 processEntry(commitPart);
1841 // Insert the commit so we can process it
1842 Vector<CommitPart *> * parts = newCommit->getParts();
1843 uint partsSize = parts->size();
1844 for(uint i=0; i<partsSize; i++) {
1845 CommitPart * commitPart = parts->get(i);
1846 processEntry(commitPart);
1850 if (transaction->getMachineId() == localMachineId) {
1851 TransactionStatus *status = transaction->getTransactionStatus();
1852 if (status != NULL) {
1853 status->setStatus(TransactionStatus_StatusCommitted);
1857 updateLiveStateFromLocal();
1858 return Pair<bool, bool>(true, true);
1860 if (transaction->getMachineId() == localMachineId) {
1861 // For locally created messages update the status
1862 // Guard evaluated was false so create abort
1863 TransactionStatus * status = transaction->getTransactionStatus();
1864 if (status != NULL) {
1865 status->setStatus(TransactionStatus_StatusAborted);
1868 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1871 Abort *newAbort = new Abort(NULL,
1872 transaction->getClientLocalSequenceNumber(),
1874 transaction->getMachineId(),
1875 transaction->getArbitrator(),
1876 localArbitrationSequenceNumber);
1877 localArbitrationSequenceNumber++;
1878 addAbortSet->add(newAbort);
1880 // Append all the commit parts to the end of the pending queue
1881 // waiting for sending to the server
1882 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1883 pendingSendArbitrationRounds->add(arbitrationRound);
1885 if (compactArbitrationData()) {
1886 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1888 Vector<CommitPart *> * parts = newArbitrationRound->getCommit()->getParts();
1889 uint partsSize = parts->size();
1890 for(uint i=0; i<partsSize; i++) {
1891 CommitPart * commitPart = parts->get(i);
1892 processEntry(commitPart);
1897 updateLiveStateFromLocal();
1898 return Pair<bool, bool>(true, false);
1903 * Compacts the arbitration data my merging commits and aggregating
1904 * aborts so that a single large push of commits can be done instead
1905 * of many small updates
1907 bool Table::compactArbitrationData() {
1908 if (pendingSendArbitrationRounds->size() < 2) {
1909 // Nothing to compact so do nothing
1913 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1914 if (lastRound->getDidSendPart()) {
1918 bool hadCommit = (lastRound->getCommit() == NULL);
1919 bool gotNewCommit = false;
1921 int numberToDelete = 1;
1922 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1923 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1925 if (round->isFull() || round->getDidSendPart()) {
1926 // Stop since there is a part that cannot be compacted and we
1927 // need to compact in order
1931 if (round->getCommit() == NULL) {
1932 // Try compacting aborts only
1933 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1934 if (newSize > ArbitrationRound_MAX_PARTS) {
1935 // Cant compact since it would be too large
1938 lastRound->addAborts(round->getAborts());
1940 // Create a new larger commit
1941 Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1942 localArbitrationSequenceNumber++;
1944 // Create the commit parts so that we can count them
1945 newCommit->createCommitParts();
1947 // Calculate the new size of the parts
1948 int newSize = newCommit->getNumberOfParts();
1949 newSize += lastRound->getAbortsCount();
1950 newSize += round->getAbortsCount();
1952 if (newSize > ArbitrationRound_MAX_PARTS) {
1953 // Cant compact since it would be too large
1957 // Set the new compacted part
1958 lastRound->setCommit(newCommit);
1959 lastRound->addAborts(round->getAborts());
1960 gotNewCommit = true;
1966 if (numberToDelete != 1) {
1967 // If there is a compaction
1968 // Delete the previous pieces that are now in the new compacted piece
1969 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1970 pendingSendArbitrationRounds->clear();
1972 for (int i = 0; i < numberToDelete; i++) {
1973 pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
1977 // Add the new compacted into the pending to send list
1978 pendingSendArbitrationRounds->add(lastRound);
1980 // Should reinsert into the commit processor
1981 if (hadCommit && gotNewCommit) {
1990 * Update all the commits and the committed tables, sets dead the dead
1993 bool Table::updateCommittedTable() {
1995 if (newCommitParts->size() == 0) {
1996 // Nothing new to process
2000 // Iterate through all the machine Ids that we received new parts for
2001 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts);
2002 while(partsit->hasNext()) {
2003 int64_t machineId = partsit->next();
2004 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2006 // Iterate through all the parts for that machine Id
2007 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts);
2008 while(pairit->hasNext()) {
2009 Pair<int64_t, int32_t> * partId = pairit->next();
2010 CommitPart *part = parts->get(partId);
2012 // Get the transaction object for that sequence number
2013 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2015 if (commitForClientTable == NULL) {
2016 // This is the first commit from this device
2017 commitForClientTable = new Hashtable<int64_t, Commit *>();
2018 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2021 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2023 if (commit == NULL) {
2024 // This is a new commit that we dont have so make a new one
2025 commit = new Commit();
2027 // Insert this new commit into the live tables
2028 commitForClientTable->put(part->getSequenceNumber(), commit);
2031 // Add that part to the commit
2032 commit->addPartDecode(part);
2038 // Clear all the new commits parts in preparation for the next time
2039 // the server sends slots
2040 newCommitParts->clear();
2042 // If we process a new commit keep track of it for future use
2043 bool didProcessANewCommit = false;
2045 // Process the commits one by one
2046 for (int64_t arbitratorId : liveCommitsTable->keySet()) {
2048 // Get all the commits for a specific arbitrator
2049 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2051 // Sort the commits in order
2052 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
2053 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2055 // Get the last commit seen from this arbitrator
2056 int64_t lastCommitSeenSequenceNumber = -1;
2057 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
2058 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2061 // Go through each new commit one by one
2062 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
2063 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2064 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2066 // Special processing if a commit is not complete
2067 if (!commit->isComplete()) {
2068 if (i == (commitSequenceNumbers->size() - 1)) {
2069 // If there is an incomplete commit and this commit is the
2070 // latest one seen then this commit cannot be processed and
2071 // there are no other commits
2074 // This is a commit that was already dead but parts of it
2075 // are still in the block chain (not flushed out yet)->
2076 // Delete it and move on
2078 commitForClientTable->remove(commit->getSequenceNumber());
2083 // Update the last transaction that was updated if we can
2084 if (commit->getTransactionSequenceNumber() != -1) {
2085 // Update the last transaction sequence number that the arbitrator arbitrated on1
2086 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber())) {
2087 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2091 // Update the last arbitration data that we have seen so far
2092 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2093 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2094 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2096 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2099 // Never seen any data from this arbitrator so record the first one
2100 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2103 // We have already seen this commit before so need to do the
2104 // full processing on this commit
2105 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2107 // Update the last transaction that was updated if we can
2108 if (commit->getTransactionSequenceNumber() != -1) {
2109 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2111 // Update the last transaction sequence number that the arbitrator arbitrated on
2112 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
2113 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2120 // If we got here then this is a brand new commit and needs full
2122 // Get what commits should be edited, these are the commits that
2123 // have live values for their keys
2124 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2126 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2127 while (kvit->hasNext()) {
2128 KeyValue *kv = kvit->next();
2129 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2133 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2135 // Update each previous commit that needs to be updated
2136 SetIterator<Commit *, Commit *> * commitit = commitsToEdit->iterator();
2137 while(commitit->hasNext()) {
2138 Commit *previousCommit = commitit->next();
2140 // Only bother with live commits (TODO: Maybe remove this check)
2141 if (previousCommit->isLive()) {
2143 // Update which keys in the old commits are still live
2145 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2146 while (kvit->hasNext()) {
2147 KeyValue *kv = kvit->next();
2148 previousCommit->invalidateKey(kv->getKey());
2153 // if the commit is now dead then remove it
2154 if (!previousCommit->isLive()) {
2155 commitForClientTable->remove(previousCommit);
2161 // Update the last seen sequence number from this arbitrator
2162 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2163 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2164 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2167 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2170 // We processed a new commit that we havent seen before
2171 didProcessANewCommit = true;
2173 // Update the committed table of keys and which commit is using which key
2175 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
2176 while (kvit->hasNext()) {
2177 KeyValue *kv = kvit->next();
2178 committedKeyValueTable->put(kv->getKey(), kv);
2179 liveCommitsByKeyTable->put(kv->getKey(), commit);
2186 return didProcessANewCommit;
2190 * Create the speculative table from transactions that are still live
2191 * and have come from the cloud
2193 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2194 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2195 // There is nothing to speculate on
2199 // Create a list of the transaction sequence numbers and sort them
2200 // from oldest to newest
2201 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2202 qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2204 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2207 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2208 // If there is a gap in the transaction sequence numbers then
2209 // there was a commit or an abort of a transaction OR there was a
2210 // new commit (Could be from offline commit) so a redo the
2211 // speculation from scratch
2213 // Start from scratch
2214 speculatedKeyValueTable->clear();
2215 lastTransactionSequenceNumberSpeculatedOn = -1;
2216 oldestTransactionSequenceNumberSpeculatedOn = -1;
2220 // Remember the front of the transaction list
2221 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2223 // Find where to start arbitration from
2224 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2226 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2227 // Make sure we are not out of bounds
2228 return false; // did not speculate
2231 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2232 bool didSkip = true;
2234 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2235 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2236 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2238 if (!transaction->isComplete()) {
2239 // If there is an incomplete transaction then there is nothing
2240 // we can do add this transactions arbitrator to the list of
2241 // arbitrators we should ignore
2242 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2247 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2251 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2253 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2254 // Guard evaluated to true so update the speculative table
2256 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2257 while (kvit->hasNext()) {
2258 KeyValue *kv = kvit->next();
2259 speculatedKeyValueTable->put(kv->getKey(), kv);
2267 // Since there was a skip we need to redo the speculation next time around
2268 lastTransactionSequenceNumberSpeculatedOn = -1;
2269 oldestTransactionSequenceNumberSpeculatedOn = -1;
2272 // We did some speculation
2277 * Create the pending transaction speculative table from transactions
2278 * that are still in the pending transaction buffer
2280 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2281 if (pendingTransactionQueue->size() == 0) {
2282 // There is nothing to speculate on
2286 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2287 // need to reset on the pending speculation
2288 lastPendingTransactionSpeculatedOn = NULL;
2289 firstPendingTransaction = pendingTransactionQueue->get(0);
2290 pendingTransactionSpeculatedKeyValueTable->clear();
2293 // Find where to start arbitration from
2294 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2296 if (startIndex >= pendingTransactionQueue->size()) {
2297 // Make sure we are not out of bounds
2301 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2302 Transaction *transaction = pendingTransactionQueue->get(i);
2304 lastPendingTransactionSpeculatedOn = transaction;
2306 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2307 // Guard evaluated to true so update the speculative table
2308 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2309 while (kvit->hasNext()) {
2310 KeyValue *kv = kvit->next();
2311 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2319 * Set dead and remove from the live transaction tables the
2320 * transactions that are dead
2322 void Table::updateLiveTransactionsAndStatus() {
2324 // Go through each of the transactions
2325 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2326 Transaction *transaction = iter->next()->getValue();
2328 // Check if the transaction is dead
2329 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2330 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2332 // Set dead the transaction
2333 transaction->setDead();
2335 // Remove the transaction from the live table
2337 liveTransactionByTransactionIdTable->remove(transaction->getId());
2341 // Go through each of the transactions
2342 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2343 TransactionStatus *status = iter->next()->getValue();
2345 // Check if the transaction is dead
2346 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2347 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2350 status->setStatus(TransactionStatus_StatusCommitted);
2359 * Process this slot, entry by entry-> Also update the latest message sent by slot
2361 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2363 // Update the last message seen
2364 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2366 // Process each entry in the slot
2367 Vector<Entry *> *entries = slot->getEntries();
2368 uint eSize = entries->size();
2369 for(uint ei=0; ei < eSize; ei++) {
2370 Entry * entry = entries->get(ei);
2371 switch (entry->getType()) {
2372 case TypeCommitPart:
2373 processEntry((CommitPart *)entry);
2376 processEntry((Abort *)entry);
2378 case TypeTransactionPart:
2379 processEntry((TransactionPart *)entry);
2382 processEntry((NewKey *)entry);
2384 case TypeLastMessage:
2385 processEntry((LastMessage *)entry, machineSet);
2387 case TypeRejectedMessage:
2388 processEntry((RejectedMessage *)entry, indexer);
2390 case TypeTableStatus:
2391 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2394 throw new Error("Unrecognized type: ");
2400 * Update the last message that was sent for a machine Id
2402 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2403 // Update what the last message received by a machine was
2404 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2408 * Add the new key to the arbitrators table and update the set of live
2409 * new keys (in case of a rescued new key message)
2411 void Table::processEntry(NewKey *entry) {
2412 // Update the arbitrator table with the new key information
2413 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2415 // Update what the latest live new key is
2416 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2417 if (oldNewKey != NULL) {
2418 // Delete the old new key messages
2419 oldNewKey->setDead();
2424 * Process new table status entries and set dead the old ones as new
2425 * ones come in-> keeps track of the largest and smallest table status
2426 * seen in this current round of updating the local copy of the block
2429 void Table::processEntry(TableStatus * entry, int64_t seq) {
2430 int newNumSlots = entry->getMaxSlots();
2431 updateCurrMaxSize(newNumSlots);
2432 initExpectedSize(seq, newNumSlots);
2434 if (liveTableStatus != NULL) {
2435 // We have a larger table status so the old table status is no
2437 liveTableStatus->setDead();
2440 // Make this new table status the latest alive table status
2441 liveTableStatus = entry;
2445 * Check old messages to see if there is a block chain violation->
2448 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2449 int64_t oldSeqNum = entry->getOldSeqNum();
2450 int64_t newSeqNum = entry->getNewSeqNum();
2451 bool isequal = entry->getEqual();
2452 int64_t machineId = entry->getMachineID();
2453 int64_t seq = entry->getSequenceNumber();
2455 // Check if we have messages that were supposed to be rejected in
2456 // our local block chain
2457 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2459 Slot *slot = indexer->getSlot(seqNum);
2462 // If we have this slot make sure that it was not supposed to be
2464 int64_t slotMachineId = slot->getMachineID();
2465 if (isequal != (slotMachineId == machineId)) {
2466 throw new Error("Server Error: Trying to insert rejected message for slot ");
2471 // Create a list of clients to watch until they see this rejected
2473 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2474 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2475 // Machine ID for the last message entry
2476 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2478 // We've seen it, don't need to continue to watch-> Our next
2479 // message will implicitly acknowledge it->
2480 if (lastMessageEntryMachineId == localMachineId) {
2484 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2485 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2487 if (entrySequenceNumber < seq) {
2488 // Add this rejected message to the set of messages that this
2489 // machine ID did not see yet
2490 addWatchVector(lastMessageEntryMachineId, entry);
2491 // This client did not see this rejected message yet so add it
2492 // to the watch set to monitor
2493 deviceWatchSet->add(lastMessageEntryMachineId);
2496 if (deviceWatchSet->isEmpty()) {
2497 // This rejected message has been seen by all the clients so
2500 // We need to watch this rejected message
2501 entry->setWatchSet(deviceWatchSet);
2506 * Check if this abort is live, if not then save it so we can kill it
2507 * later-> update the last transaction number that was arbitrated on->
2509 void Table::processEntry(Abort *entry) {
2510 if (entry->getTransactionSequenceNumber() != -1) {
2511 // update the transaction status if it was sent to the server
2512 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2513 if (status != NULL) {
2514 status->setStatus(TransactionStatus_StatusAborted);
2518 // Abort has not been seen by the client it is for yet so we need to
2520 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2521 if (previouslySeenAbort != NULL) {
2522 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2525 if (entry->getTransactionArbitrator() == localMachineId) {
2526 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2529 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2530 // The machine already saw this so it is dead
2532 liveAbortTable->remove(&entry->getAbortId());
2534 if (entry->getTransactionArbitrator() == localMachineId) {
2535 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2540 // Update the last arbitration data that we have seen so far
2541 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2542 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2543 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2545 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2548 // Never seen any data from this arbitrator so record the first one
2549 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2552 // Set dead a transaction if we can
2553 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2554 if (transactionToSetDead != NULL) {
2555 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2558 // Update the last transaction sequence number that the arbitrator
2560 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2561 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2563 if (entry->getTransactionSequenceNumber() != -1) {
2564 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2570 * Set dead the transaction part if that transaction is dead and keep
2571 * track of all new parts
2573 void Table::processEntry(TransactionPart *entry) {
2574 // Check if we have already seen this transaction and set it dead OR
2575 // if it is not alive
2576 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2577 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2578 // This transaction is dead, it was already committed or aborted
2583 // This part is still alive
2584 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2586 if (transactionPart == NULL) {
2587 // Dont have a table for this machine Id yet so make one
2588 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2589 newTransactionParts->put(entry->getMachineId(), transactionPart);
2592 // Update the part and set dead ones we have already seen (got a
2594 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2595 if (previouslySeenPart != NULL) {
2596 previouslySeenPart->setDead();
2601 * Process new commit entries and save them for future use-> Delete duplicates
2603 void Table::processEntry(CommitPart *entry) {
2604 // Update the last transaction that was updated if we can
2605 if (entry->getTransactionSequenceNumber() != -1) {
2606 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
2607 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2611 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2612 if (commitPart == NULL) {
2613 // Don't have a table for this machine Id yet so make one
2614 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2615 newCommitParts->put(entry->getMachineId(), commitPart);
2617 // Update the part and set dead ones we have already seen (got a
2619 CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2620 if (previouslySeenPart != NULL) {
2621 previouslySeenPart->setDead();
2626 * Update the last message seen table-> Update and set dead the
2627 * appropriate RejectedMessages as clients see them-> Updates the live
2628 * aborts, removes those that are dead and sets them dead-> Check that
2629 * the last message seen is correct and that there is no mismatch of
2630 * our own last message or that other clients have not had a rollback
2631 * on the last message->
2633 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2634 // We have seen this machine ID
2635 machineSet->remove(machineId);
2637 // Get the set of rejected messages that this machine Id is has not seen yet
2638 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2639 // If there is a rejected message that this machine Id has not seen yet
2640 if (watchset != NULL) {
2641 // Go through each rejected message that this machine Id has not
2644 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2645 while(rmit->hasNext()) {
2646 RejectedMessage *rm = rmit->next();
2647 // If this machine Id has seen this rejected message->->->
2648 if (rm->getSequenceNumber() <= seqNum) {
2649 // Remove it from our watchlist
2651 // Decrement machines that need to see this notification
2652 rm->removeWatcher(machineId);
2658 // Set dead the abort
2659 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2660 Abort *abort = i->next()->getValue();
2661 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2664 if (abort->getTransactionArbitrator() == localMachineId) {
2665 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2669 if (machineId == localMachineId) {
2670 // Our own messages are immediately dead->
2671 char livenessType = liveness->getType();
2672 if (livenessType==TypeLastMessage) {
2673 ((LastMessage *)liveness)->setDead();
2674 } else if (livenessType == TypeSlot) {
2675 ((Slot *)liveness)->setDead();
2677 throw new Error("Unrecognized type");
2680 // Get the old last message for this device
2681 Pair<int64_t, Liveness *> * lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2682 if (lastMessageEntry == NULL) {
2683 // If no last message then there is nothing else to process
2687 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2688 Liveness *lastEntry = lastMessageEntry->getSecond();
2689 delete lastMessageEntry;
2691 // If it is not our machine Id since we already set ours to dead
2692 if (machineId != localMachineId) {
2693 char lastEntryType = lastEntry->getType();
2695 if (lastEntryType == TypeLastMessage) {
2696 ((LastMessage *)lastEntry)->setDead();
2697 } else if (lastEntryType == TypeSlot) {
2698 ((Slot *)lastEntry)->setDead();
2700 throw new Error("Unrecognized type");
2703 // Make sure the server is not playing any games
2704 if (machineId == localMachineId) {
2705 if (hadPartialSendToServer) {
2706 // We were not making any updates and we had a machine mismatch
2707 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2708 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2711 // We were not making any updates and we had a machine mismatch
2712 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2713 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2717 if (lastMessageSeqNum > seqNum) {
2718 throw new Error("Server Error: Rollback on remote machine sequence number");
2724 * Add a rejected message entry to the watch set to keep track of
2725 * which clients have seen that rejected message entry and which have
2728 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2729 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2730 if (entries == NULL) {
2731 // There is no set for this machine ID yet so create one
2732 entries = new Hashset<RejectedMessage *>();
2733 rejectedMessageWatchVectorTable->put(machineId, entries);
2735 entries->add(entry);
2739 * Check if the HMAC chain is not violated
2741 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2742 for (int i = 0; i < newSlots->length(); i++) {
2743 Slot *currSlot = newSlots->get(i);
2744 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2745 if (prevSlot != NULL &&
2746 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2747 throw new Error("Server Error: Invalid HMAC Chain");