3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
19 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
21 cloud(new CloudComm(this, baseurl, password, listeningPort)),
23 liveTableStatus(NULL),
24 pendingTransactionBuilder(NULL),
25 lastPendingTransactionSpeculatedOn(NULL),
26 firstPendingTransaction(NULL),
28 bufferResizeThreshold(0),
30 oldestLiveSlotSequenceNumver(1),
31 localMachineId(_localMachineId),
33 localTransactionSequenceNumber(0),
34 lastTransactionSequenceNumberSpeculatedOn(0),
35 oldestTransactionSequenceNumberSpeculatedOn(0),
36 localArbitrationSequenceNumber(0),
37 hadPartialSendToServer(false),
38 attemptedToSendToServer(false),
40 didFindTableStatus(false),
42 lastSlotAttemptedToSend(NULL),
45 lastTransactionPartsSent(NULL),
46 lastPendingSendArbitrationEntriesToDelete(NULL),
48 committedKeyValueTable(NULL),
49 speculatedKeyValueTable(NULL),
50 pendingTransactionSpeculatedKeyValueTable(NULL),
51 liveNewKeyTable(NULL),
52 lastMessageTable(NULL),
53 rejectedMessageWatchVectorTable(NULL),
54 arbitratorTable(NULL),
56 newTransactionParts(NULL),
58 lastArbitratedTransactionNumberByArbitratorTable(NULL),
59 liveTransactionBySequenceNumberTable(NULL),
60 liveTransactionByTransactionIdTable(NULL),
61 liveCommitsTable(NULL),
62 liveCommitsByKeyTable(NULL),
63 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
64 rejectedSlotVector(NULL),
65 pendingTransactionQueue(NULL),
66 pendingSendArbitrationRounds(NULL),
67 pendingSendArbitrationEntriesToDelete(NULL),
68 transactionPartsSent(NULL),
69 outstandingTransactionStatus(NULL),
70 liveAbortsGeneratedByLocal(NULL),
71 offlineTransactionsCommittedAndAtServer(NULL),
72 localCommunicationTable(NULL),
73 lastTransactionSeenFromMachineFromServer(NULL),
74 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
75 lastInsertedNewKey(false),
81 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
85 liveTableStatus(NULL),
86 pendingTransactionBuilder(NULL),
87 lastPendingTransactionSpeculatedOn(NULL),
88 firstPendingTransaction(NULL),
90 bufferResizeThreshold(0),
92 oldestLiveSlotSequenceNumver(1),
93 localMachineId(_localMachineId),
95 localTransactionSequenceNumber(0),
96 lastTransactionSequenceNumberSpeculatedOn(0),
97 oldestTransactionSequenceNumberSpeculatedOn(0),
98 localArbitrationSequenceNumber(0),
99 hadPartialSendToServer(false),
100 attemptedToSendToServer(false),
102 didFindTableStatus(false),
104 lastSlotAttemptedToSend(NULL),
107 lastTransactionPartsSent(NULL),
108 lastPendingSendArbitrationEntriesToDelete(NULL),
110 committedKeyValueTable(NULL),
111 speculatedKeyValueTable(NULL),
112 pendingTransactionSpeculatedKeyValueTable(NULL),
113 liveNewKeyTable(NULL),
114 lastMessageTable(NULL),
115 rejectedMessageWatchVectorTable(NULL),
116 arbitratorTable(NULL),
117 liveAbortTable(NULL),
118 newTransactionParts(NULL),
119 newCommitParts(NULL),
120 lastArbitratedTransactionNumberByArbitratorTable(NULL),
121 liveTransactionBySequenceNumberTable(NULL),
122 liveTransactionByTransactionIdTable(NULL),
123 liveCommitsTable(NULL),
124 liveCommitsByKeyTable(NULL),
125 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
126 rejectedSlotVector(NULL),
127 pendingTransactionQueue(NULL),
128 pendingSendArbitrationRounds(NULL),
129 pendingSendArbitrationEntriesToDelete(NULL),
130 transactionPartsSent(NULL),
131 outstandingTransactionStatus(NULL),
132 liveAbortsGeneratedByLocal(NULL),
133 offlineTransactionsCommittedAndAtServer(NULL),
134 localCommunicationTable(NULL),
135 lastTransactionSeenFromMachineFromServer(NULL),
136 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
137 lastInsertedNewKey(false),
144 * Init all the stuff needed for for table usage
147 // Init helper objects
148 random = new Random();
149 buffer = new SlotBuffer();
152 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
153 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
154 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
155 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
156 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
157 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
158 arbitratorTable = new Hashtable<IoTString *, int64_t>();
159 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
160 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
161 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
162 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
163 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
164 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
165 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
166 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
167 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
168 rejectedSlotVector = new Vector<int64_t>();
169 pendingTransactionQueue = new Vector<Transaction *>();
170 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
171 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
172 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
173 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
174 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals>();
175 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
176 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
177 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
178 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
181 numberOfSlots = buffer->capacity();
182 setResizeThreshold();
186 * Initialize the table by inserting a table status as the first entry
187 * into the table status also initialize the crypto stuff.
189 void Table::initTable() {
190 cloud->initSecurity();
192 // Create the first insertion into the block chain which is the table status
193 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
194 localSequenceNumber++;
195 TableStatus *status = new TableStatus(s, numberOfSlots);
197 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
200 array = new Array<Slot *>(1);
202 // update local block chain
203 validateAndUpdate(array, true);
204 } else if (array->length() == 1) {
205 // in case we did push the slot BUT we failed to init it
206 validateAndUpdate(array, true);
208 throw new Error("Error on initialization");
213 * Rebuild the table from scratch by pulling the latest block chain
216 void Table::rebuild() {
217 // Just pull the latest slots from the server
218 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
219 validateAndUpdate(newslots, true);
221 updateLiveTransactionsAndStatus();
224 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
225 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
228 int64_t Table::getArbitrator(IoTString *key) {
229 return arbitratorTable->get(key);
232 void Table::close() {
236 IoTString *Table::getCommitted(IoTString *key) {
237 KeyValue *kv = committedKeyValueTable->get(key);
240 return kv->getValue();
246 IoTString *Table::getSpeculative(IoTString *key) {
247 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
250 kv = speculatedKeyValueTable->get(key);
254 kv = committedKeyValueTable->get(key);
258 return kv->getValue();
264 IoTString *Table::getCommittedAtomic(IoTString *key) {
265 KeyValue *kv = committedKeyValueTable->get(key);
267 if (!arbitratorTable->contains(key)) {
268 throw new Error("Key not Found.");
271 // Make sure new key value pair matches the current arbitrator
272 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
273 // TODO: Maybe not throw en error
274 throw new Error("Not all Key Values Match Arbitrator.");
278 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
279 return kv->getValue();
281 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
286 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
287 if (!arbitratorTable->contains(key)) {
288 throw new Error("Key not Found.");
291 // Make sure new key value pair matches the current arbitrator
292 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
293 // TODO: Maybe not throw en error
294 throw new Error("Not all Key Values Match Arbitrator.");
297 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
300 kv = speculatedKeyValueTable->get(key);
304 kv = committedKeyValueTable->get(key);
308 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
309 return kv->getValue();
311 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
316 bool Table::update() {
318 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
319 validateAndUpdate(newSlots, false);
321 updateLiveTransactionsAndStatus();
323 } catch (Exception *e) {
324 SetIterator<int64_t> * kit = getKeyIterator(localCommunicationTable);
325 while(kit->hasNext()) {
326 int64_t m = kit->next();
335 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
337 if (!arbitratorTable->contains(keyName)) {
338 // There is already an arbitrator
341 NewKey *newKey = new NewKey(NULL, keyName, machineId);
343 if (sendToServer(newKey)) {
344 // If successfully inserted
350 void Table::startTransaction() {
351 // Create a new transaction, invalidates any old pending transactions.
352 pendingTransactionBuilder = new PendingTransaction(localMachineId);
355 void Table::addKV(IoTString *key, IoTString *value) {
357 // Make sure it is a valid key
358 if (!arbitratorTable->contains(key)) {
359 throw new Error("Key not Found.");
362 // Make sure new key value pair matches the current arbitrator
363 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
364 // TODO: Maybe not throw en error
365 throw new Error("Not all Key Values Match Arbitrator.");
368 // Add the key value to this transaction
369 KeyValue *kv = new KeyValue(key, value);
370 pendingTransactionBuilder->addKV(kv);
373 TransactionStatus *Table::commitTransaction() {
374 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375 // transaction with no updates will have no effect on the system
376 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
379 // Set the local transaction sequence number and increment
380 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381 localTransactionSequenceNumber++;
383 // Create the transaction status
384 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
386 // Create the new transaction
387 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388 newTransaction->setTransactionStatus(transactionStatus);
390 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391 // Add it to the queue and invalidate the builder for safety
392 pendingTransactionQueue->add(newTransaction);
394 arbitrateOnLocalTransaction(newTransaction);
395 updateLiveStateFromLocal();
398 pendingTransactionBuilder = new PendingTransaction(localMachineId);
402 } catch (ServerException *e) {
404 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405 uint size = pendingTransactionQueue->size();
407 for(int iter = 0; iter < size; iter++) {
408 Transaction *transaction = pendingTransactionQueue->get(iter);
409 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
411 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
412 // Already contacted this client so ignore all attempts to contact this client
413 // to preserve ordering for arbitrator
417 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
419 if (sendReturn.getFirst()) {
420 // Failed to contact over local
421 arbitratorTriedAndFailed->add(transaction->getArbitrator());
423 // Successful contact or should not contact
425 if (sendReturn.getSecond()) {
431 pendingTransactionQueue->setSize(oldindex);
434 updateLiveStateFromLocal();
436 return transactionStatus;
440 * Recalculate the new resize threshold
442 void Table::setResizeThreshold() {
443 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
444 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
447 int64_t Table::getLocalSequenceNumber() {
448 return localSequenceNumber;
451 bool Table::sendToServer(NewKey *newKey) {
452 bool fromRetry = false;
454 if (hadPartialSendToServer) {
455 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
456 if (newSlots->length() == 0) {
458 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
460 if (sendSlotsReturn.getFirst()) {
461 if (newKey != NULL) {
462 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
467 SetIterator<Transaction *> * trit = getKeyIterator(lastTransactionPartsSent);
468 while(trit->hasNext()) {
469 Transaction *transaction = trit->next();
470 transaction->resetServerFailure();
471 // Update which transactions parts still need to be sent
472 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
473 // Add the transaction status to the outstanding list
474 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
476 // Update the transaction status
477 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
479 // Check if all the transaction parts were successfully
480 // sent and if so then remove it from pending
481 if (transaction->didSendAllParts()) {
482 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483 pendingTransactionQueue->remove(transaction);
488 newSlots = sendSlotsReturn.getThird();
489 bool isInserted = false;
490 for (uint si = 0; si < newSlots->length(); si++) {
491 Slot *s = newSlots->get(si);
492 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
498 for (uint si = 0; si < newSlots->length(); si++) {
499 Slot *s = newSlots->get(si);
504 // Process each entry in the slot
505 Vector<Entry *> * ventries=s->getEntries();
506 uint vesize = ventries->size();
507 for(uint vei = 0; vei < vesize; vei++) {
508 Entry *entry = ventries->get(vei);
509 if (entry->getType() == TypeLastMessage) {
510 LastMessage *lastMessage = (LastMessage *)entry;
511 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
520 if (newKey != NULL) {
521 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
526 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
527 transaction->resetServerFailure();
529 // Update which transactions parts still need to be sent
530 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
532 // Add the transaction status to the outstanding list
533 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
535 // Update the transaction status
536 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
538 // Check if all the transaction parts were successfully sent and if so then remove it from pending
539 if (transaction->didSendAllParts()) {
540 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
541 pendingTransactionQueue->remove(transaction);
543 transaction->resetServerFailure();
544 // Set the transaction sequence number back to nothing
545 if (!transaction->didSendAPartToServer()) {
546 transaction->setSequenceNumber(-1);
553 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
554 transaction->resetServerFailure();
555 // Set the transaction sequence number back to nothing
556 if (!transaction->didSendAPartToServer()) {
557 transaction->setSequenceNumber(-1);
561 if (sendSlotsReturn.getThird()->length() != 0) {
562 // insert into the local block chain
563 validateAndUpdate(sendSlotsReturn.getThird(), true);
567 bool isInserted = false;
568 for (uint si = 0; si < newSlots->length(); si++) {
569 Slot *s = newSlots->get(si);
570 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
576 for (uint si = 0; si < newSlots->length(); si++) {
577 Slot *s = newSlots->get(si);
582 // Process each entry in the slot
583 for (Entry *entry : s->getEntries()) {
585 if (entry->getType() == TypeLastMessage) {
586 LastMessage *lastMessage = (LastMessage *)entry;
587 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
596 if (newKey != NULL) {
597 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
602 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
603 transaction->resetServerFailure();
605 // Update which transactions parts still need to be sent
606 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
608 // Add the transaction status to the outstanding list
609 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
611 // Update the transaction status
612 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
614 // Check if all the transaction parts were successfully sent and if so then remove it from pending
615 if (transaction->didSendAllParts()) {
616 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
617 pendingTransactionQueue->remove(transaction);
619 transaction->resetServerFailure();
620 // Set the transaction sequence number back to nothing
621 if (!transaction->didSendAPartToServer()) {
622 transaction->setSequenceNumber(-1);
627 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
628 transaction->resetServerFailure();
629 // Set the transaction sequence number back to nothing
630 if (!transaction->didSendAPartToServer()) {
631 transaction->setSequenceNumber(-1);
636 // insert into the local block chain
637 validateAndUpdate(newSlots, true);
640 } catch (ServerException *e) {
647 // While we have stuff that needs inserting into the block chain
648 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
652 if (hadPartialSendToServer) {
653 throw new Error("Should Be error free");
658 // If there is a new key with same name then end
659 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
664 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
665 localSequenceNumber++;
667 // Try to fill the slot with data
668 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
669 bool needsResize = fillSlotsReturn.getFirst();
670 int newSize = fillSlotsReturn.getSecond();
671 bool insertedNewKey = fillSlotsReturn.getThird();
674 // Reset which transaction to send
675 for (Transaction *transaction : transactionPartsSent->keySet()) {
676 transaction->resetNextPartToSend();
678 // Set the transaction sequence number back to nothing
679 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
680 transaction->setSequenceNumber(-1);
684 // Clear the sent data since we are trying again
685 pendingSendArbitrationEntriesToDelete->clear();
686 transactionPartsSent->clear();
688 // We needed a resize so try again
689 fillSlot(slot, true, newKey);
692 lastSlotAttemptedToSend = slot;
693 lastIsNewKey = (newKey != NULL);
694 lastInsertedNewKey = insertedNewKey;
695 lastNewSize = newSize;
697 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
698 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
701 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
703 if (sendSlotsReturn.getFirst()) {
705 // Did insert into the block chain
707 if (insertedNewKey) {
708 // This slot was what was inserted not a previous slot
710 // New Key was successfully inserted into the block chain so dont want to insert it again
714 // Remove the aborts and commit parts that were sent from the pending to send queue
715 uint size = pendingSendArbitrationRounds->size();
717 for (uint i=0; i < size; i++)
718 ArbitrationRound *round = pendingSendArbitrartionRounds->get(i);
719 round->removeParts(pendingSendArbitrationEntriesToDelete);
721 if (!round->isDoneSending()) {
722 // Sent all the parts
723 pendingSendArbitrartionRounds->set(oldcount++,
724 pendingSendArbitrartionRounds->get(i));
727 pendingSendArbitrationRounds->setSize(oldcount);
729 for (Transaction *transaction : transactionPartsSent->keySet()) {
730 transaction->resetServerFailure();
732 // Update which transactions parts still need to be sent
733 transaction->removeSentParts(transactionPartsSent->get(transaction));
735 // Add the transaction status to the outstanding list
736 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
738 // Update the transaction status
739 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
741 // Check if all the transaction parts were successfully sent and if so then remove it from pending
742 if (transaction->didSendAllParts()) {
743 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
744 pendingTransactionQueue->remove(transaction);
748 // Reset which transaction to send
749 for (Transaction *transaction : transactionPartsSent->keySet()) {
750 transaction->resetNextPartToSend();
752 // Set the transaction sequence number back to nothing
753 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
754 transaction->setSequenceNumber(-1);
759 // Clear the sent data in preparation for next send
760 pendingSendArbitrationEntriesToDelete->clear();
761 transactionPartsSent->clear();
763 if (sendSlotsReturn.getThird()->length() != 0) {
764 // insert into the local block chain
765 validateAndUpdate(sendSlotsReturn.getThird(), true);
769 } catch (ServerException *e) {
771 if (e->getType() != ServerException->TypeInputTimeout) {
772 // Nothing was able to be sent to the server so just clear these data structures
773 for (Transaction *transaction : transactionPartsSent->keySet()) {
774 transaction->resetNextPartToSend();
776 // Set the transaction sequence number back to nothing
777 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
778 transaction->setSequenceNumber(-1);
782 // There was a partial send to the server
783 hadPartialSendToServer = true;
785 // Nothing was able to be sent to the server so just clear these data structures
786 for (Transaction *transaction : transactionPartsSent->keySet()) {
787 transaction->resetNextPartToSend();
788 transaction->setServerFailure();
792 pendingSendArbitrationEntriesToDelete->clear();
793 transactionPartsSent->clear();
798 return newKey == NULL;
801 bool Table::updateFromLocal(int64_t machineId) {
802 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
803 if (localCommunicationInformation == NULL) {
804 // Cant talk to that device locally so do nothing
808 // Get the size of the send data
809 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
811 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
812 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
813 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
816 Array<char> *sendData = new Array<char>(sendDataSize);
817 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
820 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
824 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
825 localSequenceNumber++;
827 if (returnData == NULL) {
828 // Could not contact server
833 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
834 int numberOfEntries = bbDecode->getInt();
836 for (int i = 0; i < numberOfEntries; i++) {
837 char type = bbDecode->get();
838 if (type == TypeAbort) {
839 Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
841 } else if (type == TypeCommitPart) {
842 CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
843 processEntry(commitPart);
847 updateLiveStateFromLocal();
852 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
854 // Get the devices local communications
855 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
857 if (localCommunicationInformation == NULL) {
858 // Cant talk to that device locally so do nothing
859 return Pair<bool, bool>(true, false);
862 // Get the size of the send data
863 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
864 for (TransactionPart *part : transaction->getParts()->values()) {
865 sendDataSize += part->getSize();
868 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
869 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
870 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
873 // Make the send data size
874 Array<char> *sendData = new Array<char>(sendDataSize);
875 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
878 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
879 bbEncode->putInt(transaction->getParts()->size());
880 for (TransactionPart *part : transaction->getParts()->values()) {
881 part->encode(bbEncode);
886 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
887 localSequenceNumber++;
889 if (returnData == NULL) {
890 // Could not contact server
891 return Pair<bool, bool>(true, false);
895 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
896 bool didCommit = bbDecode->get() == 1;
897 bool couldArbitrate = bbDecode->get() == 1;
898 int numberOfEntries = bbDecode->getInt();
899 bool foundAbort = false;
901 for (int i = 0; i < numberOfEntries; i++) {
902 char type = bbDecode->get();
903 if (type == TypeAbort) {
904 Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
906 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
911 } else if (type == TypeCommitPart) {
912 CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
913 processEntry(commitPart);
917 updateLiveStateFromLocal();
919 if (couldArbitrate) {
920 TransactionStatus status = transaction->getTransactionStatus();
922 status->setStatus(TransactionStatus_StatusCommitted);
924 status->setStatus(TransactionStatus_StatusAborted);
927 TransactionStatus status = transaction->getTransactionStatus();
929 status->setStatus(TransactionStatus_StatusAborted);
931 status->setStatus(TransactionStatus_StatusCommitted);
935 return Pair<bool, bool>(false, true);
938 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
941 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
942 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
943 int numberOfParts = bbDecode->getInt();
945 // If we did commit a transaction or not
946 bool didCommit = false;
947 bool couldArbitrate = false;
949 if (numberOfParts != 0) {
951 // decode the transaction
952 Transaction *transaction = new Transaction();
953 for (int i = 0; i < numberOfParts; i++) {
955 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
956 transaction->addPartDecode(newPart);
959 // Arbitrate on transaction and pull relevant return data
960 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
961 couldArbitrate = localArbitrateReturn.getFirst();
962 didCommit = localArbitrateReturn.getSecond();
964 updateLiveStateFromLocal();
966 // Transaction was sent to the server so keep track of it to prevent double commit
967 if (transaction->getSequenceNumber() != -1) {
968 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
972 // The data to send back
973 int returnDataSize = 0;
974 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
976 // Get the aborts to send back
977 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
978 Collections->sort(abortLocalSequenceNumbers);
979 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
980 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
984 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
985 unseenArbitrations->add(abort);
986 returnDataSize += abort->getSize();
989 // Get the commits to send back
990 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
991 if (commitForClientTable != NULL) {
992 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
993 Collections->sort(commitLocalSequenceNumbers);
995 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
996 Commit *commit = commitForClientTable->get(localSequenceNumber);
998 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1002 unseenArbitrations->addAll(commit->getParts()->values());
1004 for (CommitPart *commitPart : commit->getParts()->values()) {
1005 returnDataSize += commitPart->getSize();
1010 // Number of arbitration entries to decode
1011 returnDataSize += 2 * sizeof(int32_t);
1013 // bool of did commit or not
1014 if (numberOfParts != 0) {
1015 returnDataSize += sizeof(char);
1018 // Data to send Back
1019 Array<char> *returnData = new Array<char>(returnDataSize);
1020 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1022 if (numberOfParts != 0) {
1024 bbEncode->put((char)1);
1026 bbEncode->put((char)0);
1028 if (couldArbitrate) {
1029 bbEncode->put((char)1);
1031 bbEncode->put((char)0);
1035 bbEncode->putInt(unseenArbitrations->size());
1036 uint size = unseenArbitrations->size();
1037 for(uint i = 0; i< size; i++) {
1038 Entry * entry = unseenArbitrations->get(i);
1039 entry->encode(bbEncode);
1042 localSequenceNumber++;
1046 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1047 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1048 attemptedToSendToServer = true;
1050 bool inserted = false;
1051 bool lastTryInserted = false;
1053 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1054 if (array == NULL) {
1055 array = new Array<Slot *>();
1056 array->set(0, slot);
1057 rejectedSlotVector->clear();
1060 if (array->length() == 0) {
1061 throw new Error("Server Error: Did not send any slots");
1064 // if (attemptedToSendToServerTmp) {
1065 if (hadPartialSendToServer) {
1067 bool isInserted = false;
1068 uint size = array->size();
1069 for(uint i=0; i < size; i++) {
1070 Slot *s = array->get(i);
1071 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1077 for(uint i=0; i < size; i++) {
1078 Slot *s = array->get(i);
1083 // Process each entry in the slot
1084 for (Entry *entry : s->getEntries()) {
1086 if (entry->getType() == TypeLastMessage) {
1087 LastMessage *lastMessage = (LastMessage *)entry;
1089 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1098 rejectedSlotVector->add(slot->getSequenceNumber());
1099 lastTryInserted = false;
1101 lastTryInserted = true;
1104 rejectedSlotVector->add(slot->getSequenceNumber());
1105 lastTryInserted = false;
1109 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1113 * Returns false if a resize was needed
1115 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1117 if (liveSlotCount > bufferResizeThreshold) {
1118 resize = true; //Resize is forced
1122 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1123 TableStatus *status = new TableStatus(slot, newSize);
1124 slot->addEntry(status);
1127 // Fill with rejected slots first before doing anything else
1128 doRejectedMessages(slot);
1130 // Do mandatory rescue of entries
1131 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1133 // Extract working variables
1134 bool needsResize = mandatoryRescueReturn.getFirst();
1135 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1136 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1138 if (needsResize && !resize) {
1139 // We need to resize but we are not resizing so return false
1140 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1143 bool inserted = false;
1144 if (newKeyEntry != NULL) {
1145 newKeyEntry->setSlot(slot);
1146 if (slot->hasSpace(newKeyEntry)) {
1147 slot->addEntry(newKeyEntry);
1152 // Clear the transactions, aborts and commits that were sent previously
1153 transactionPartsSent->clear();
1154 pendingSendArbitrationEntriesToDelete->clear();
1155 uint size = pendingSendArbitrationRounds->size();
1156 for (uint i=0; i<size; i++)
1157 ArbitrartionRound *round = pendingSendArbitrationRounds->get(i);
1158 bool isFull = false;
1159 round->generateParts();
1160 Vector<Entry *> *parts = round->getParts();
1162 // Insert pending arbitration data
1163 uint vsize = parts->size();
1164 for (uint vi=0; vi<vsize; vi++) {
1165 Entry *arbitrationData = parts->get(vi);
1167 // If it is an abort then we need to set some information
1168 if (arbitrationData->getType() == TypeAbort) {
1169 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1172 if (!slot->hasSpace(arbitrationData)) {
1173 // No space so cant do anything else with these data entries
1178 // Add to this current slot and add it to entries to delete
1179 slot->addEntry(arbitrationData);
1180 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1188 if (pendingTransactionQueue->size() > 0) {
1189 Transaction *transaction = pendingTransactionQueue->get(0);
1190 // Set the transaction sequence number if it has yet to be inserted into the block chain
1191 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1192 transaction->setSequenceNumber(slot->getSequenceNumber());
1196 TransactionPart *part = transaction->getNextPartToSend();
1198 // Ran out of parts to send for this transaction so move on
1202 if (slot->hasSpace(part)) {
1203 slot->addEntry(part);
1204 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1205 if (partsSent == NULL) {
1206 partsSent = new Vector<int32_t>();
1207 transactionPartsSent->put(transaction, partsSent);
1209 partsSent->add(part->getPartNumber());
1210 transactionPartsSent->put(transaction, partsSent);
1217 // Fill the remainder of the slot with rescue data
1218 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1220 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1223 void Table::doRejectedMessages(Slot *s) {
1224 if (!rejectedSlotVector->isEmpty()) {
1225 /* TODO: We should avoid generating a rejected message entry if
1226 * there is already a sufficient entry in the queue (e->g->,
1227 * equalsto value of true and same sequence number)-> */
1229 int64_t old_seqn = rejectedSlotVector->firstElement();
1230 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1231 int64_t new_seqn = rejectedSlotVector->lastElement();
1232 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1235 int64_t prev_seqn = -1;
1237 /* Go through list of missing messages */
1238 for (; i < rejectedSlotVector->size(); i++) {
1239 int64_t curr_seqn = rejectedSlotVector->get(i);
1240 Slot *s_msg = buffer->getSlot(curr_seqn);
1243 prev_seqn = curr_seqn;
1245 /* Generate rejected message entry for missing messages */
1246 if (prev_seqn != -1) {
1247 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1250 /* Generate rejected message entries for present messages */
1251 for (; i < rejectedSlotVector->size(); i++) {
1252 int64_t curr_seqn = rejectedSlotVector->get(i);
1253 Slot *s_msg = buffer->getSlot(curr_seqn);
1254 int64_t machineid = s_msg->getMachineID();
1255 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1262 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1263 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1264 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1265 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1266 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1269 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1270 bool seenLiveSlot = false;
1271 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1272 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1276 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1277 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1278 // Push slot number forward
1279 if (!seenLiveSlot) {
1280 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1283 if (!previousSlot->isLive()) {
1287 // We have seen a live slot
1288 seenLiveSlot = true;
1290 // Get all the live entries for a slot
1291 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1293 // Iterate over all the live entries and try to rescue them
1294 for (Entry *liveEntry : liveEntries) {
1295 if (slot->hasSpace(liveEntry)) {
1296 // Enough space to rescue the entry
1297 slot->addEntry(liveEntry);
1298 } else if (currentSequenceNumber == firstIfFull) {
1299 //if there's no space but the entry is about to fall off the queue
1300 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1306 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1309 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1310 /* now go through live entries from least to greatest sequence number until
1311 * either all live slots added, or the slot doesn't have enough room
1312 * for SKIP_THRESHOLD consecutive entries*/
1314 int64_t newestseqnum = buffer->getNewestSeqNum();
1316 for (; seqn <= newestseqnum; seqn++) {
1317 Slot *prevslot = buffer->getSlot(seqn);
1318 //Push slot number forward
1320 oldestLiveSlotSequenceNumver = seqn;
1322 if (!prevslot->isLive())
1324 seenliveslot = true;
1325 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1326 for (Entry *liveentry : liveentries) {
1327 if (s->hasSpace(liveentry))
1328 s->addEntry(liveentry);
1331 if (skipcount > Table_SKIP_THRESHOLD)
1339 * Checks for malicious activity and updates the local copy of the block chain->
1341 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1342 // The cloud communication layer has checked slot HMACs already
1344 if (newSlots->length() == 0) {
1348 // Make sure all slots are newer than the last largest slot this
1350 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1351 if (firstSeqNum <= sequenceNumber) {
1352 throw new Error("Server Error: Sent older slots!");
1355 // Create an object that can access both new slots and slots in our
1356 // local chain without committing slots to our local chain
1357 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1359 // Check that the HMAC chain is not broken
1360 checkHMACChain(indexer, newSlots);
1362 // Set to keep track of messages from clients
1363 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1365 // Process each slots data
1366 for (Slot *slot : newSlots) {
1367 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1369 updateExpectedSize();
1372 // If there is a gap, check to see if the server sent us
1374 if (firstSeqNum != (sequenceNumber + 1)) {
1376 // Check the size of the slots that were sent down by the server->
1377 // Can only check the size if there was a gap
1378 checkNumSlots(newSlots->length);
1380 // Since there was a gap every machine must have pushed a slot or
1381 // must have a last message message-> If not then the server is
1383 if (!machineSet->isEmpty()) {
1384 throw new Error("Missing record for machines: ");
1388 // Update the size of our local block chain->
1391 // Commit new to slots to the local block chain->
1392 for (Slot *slot : newSlots) {
1394 // Insert this slot into our local block chain copy->
1395 buffer->putSlot(slot);
1397 // Keep track of how many slots are currently live (have live data
1402 // Get the sequence number of the latest slot in the system
1403 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1404 updateLiveStateFromServer();
1406 // No Need to remember after we pulled from the server
1407 offlineTransactionsCommittedAndAtServer->clear();
1409 // This is invalidated now
1410 hadPartialSendToServer = false;
1413 void Table::updateLiveStateFromServer() {
1414 // Process the new transaction parts
1415 processNewTransactionParts();
1417 // Do arbitration on new transactions that were received
1418 arbitrateFromServer();
1420 // Update all the committed keys
1421 bool didCommitOrSpeculate = updateCommittedTable();
1423 // Delete the transactions that are now dead
1424 updateLiveTransactionsAndStatus();
1427 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1428 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1431 void Table::updateLiveStateFromLocal() {
1432 // Update all the committed keys
1433 bool didCommitOrSpeculate = updateCommittedTable();
1435 // Delete the transactions that are now dead
1436 updateLiveTransactionsAndStatus();
1439 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1440 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1443 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1444 int64_t prevslots = firstSequenceNumber;
1446 if (didFindTableStatus) {
1448 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1451 didFindTableStatus = true;
1452 currMaxSize = numberOfSlots;
1455 void Table::updateExpectedSize() {
1458 if (expectedsize > currMaxSize) {
1459 expectedsize = currMaxSize;
1465 * Check the size of the block chain to make sure there are enough
1466 * slots sent back by the server-> This is only called when we have a
1467 * gap between the slots that we have locally and the slots sent by
1468 * the server therefore in the slots sent by the server there will be
1469 * at least 1 Table status message
1471 void Table::checkNumSlots(int numberOfSlots) {
1472 if (numberOfSlots != expectedsize) {
1473 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1477 void Table::updateCurrMaxSize(int newmaxsize) {
1478 currMaxSize = newmaxsize;
1483 * Update the size of of the local buffer if it is needed->
1485 void Table::commitNewMaxSize() {
1486 didFindTableStatus = false;
1488 // Resize the local slot buffer
1489 if (numberOfSlots != currMaxSize) {
1490 buffer->resize((int32_t)currMaxSize);
1493 // Change the number of local slots to the new size
1494 numberOfSlots = (int32_t)currMaxSize;
1496 // Recalculate the resize threshold since the size of the local
1497 // buffer has changed
1498 setResizeThreshold();
1502 * Process the new transaction parts from this latest round of slots
1503 * received from the server
1505 void Table::processNewTransactionParts() {
1507 if (newTransactionParts->size() == 0) {
1508 // Nothing new to process
1512 // Iterate through all the machine Ids that we received new parts
1514 for (int64_t machineId : newTransactionParts->keySet()) {
1515 Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1517 // Iterate through all the parts for that machine Id
1518 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1519 TransactionPart *part = parts->get(partId);
1521 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1522 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1523 // Set dead the transaction part
1528 // Get the transaction object for that sequence number
1529 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1531 if (transaction == NULL) {
1532 // This is a new transaction that we dont have so make a new one
1533 transaction = new Transaction();
1535 // Insert this new transaction into the live tables
1536 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1537 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1540 // Add that part to the transaction
1541 transaction->addPartDecode(part);
1545 // Clear all the new transaction parts in preparation for the next
1546 // time the server sends slots
1547 newTransactionParts->clear();
1550 void Table::arbitrateFromServer() {
1552 if (liveTransactionBySequenceNumberTable->size() == 0) {
1553 // Nothing to arbitrate on so move on
1557 // Get the transaction sequence numbers and sort from oldest to newest
1558 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1559 Collections->sort(transactionSequenceNumbers);
1561 // Collection of key value pairs that are
1562 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1564 // The last transaction arbitrated on
1565 int64_t lastTransactionCommitted = -1;
1566 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1568 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1569 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1573 // Check if this machine arbitrates for this transaction if not
1574 // then we cant arbitrate this transaction
1575 if (transaction->getArbitrator() != localMachineId) {
1579 if (transactionSequenceNumber < lastSeqNumArbOn) {
1583 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1584 // We have seen this already locally so dont commit again
1589 if (!transaction->isComplete()) {
1590 // Will arbitrate in incorrect order if we continue so just break
1596 // update the largest transaction seen by arbitrator from server
1597 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1598 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1600 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1601 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1602 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1606 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1607 // Guard evaluated as true
1609 // Update the local changes so we can make the commit
1610 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1611 speculativeTableTmp->put(kv->getKey(), kv);
1614 // Update what the last transaction committed was for use in batch commit
1615 lastTransactionCommitted = transactionSequenceNumber;
1617 // Guard evaluated was false so create abort
1619 Abort *newAbort = new Abort(NULL,
1620 transaction->getClientLocalSequenceNumber(),
1621 transaction->getSequenceNumber(),
1622 transaction->getMachineId(),
1623 transaction->getArbitrator(),
1624 localArbitrationSequenceNumber);
1625 localArbitrationSequenceNumber++;
1626 generatedAborts->add(newAbort);
1628 // Insert the abort so we can process
1629 processEntry(newAbort);
1632 lastSeqNumArbOn = transactionSequenceNumber;
1635 Commit *newCommit = NULL;
1637 // If there is something to commit
1638 if (speculativeTableTmp->size() != 0) {
1639 // Create the commit and increment the commit sequence number
1640 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1641 localArbitrationSequenceNumber++;
1643 // Add all the new keys to the commit
1644 for (KeyValue *kv : speculativeTableTmp->values()) {
1645 newCommit->addKV(kv);
1648 // create the commit parts
1649 newCommit->createCommitParts();
1651 // Append all the commit parts to the end of the pending queue
1652 // waiting for sending to the server
1653 // Insert the commit so we can process it
1654 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1655 processEntry(commitPart);
1659 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1660 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1661 pendingSendArbitrationRounds->add(arbitrationRound);
1663 if (compactArbitrationData()) {
1664 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1665 if (newArbitrationRound->getCommit() != NULL) {
1666 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1667 processEntry(commitPart);
1674 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1676 // Check if this machine arbitrates for this transaction if not then
1677 // we cant arbitrate this transaction
1678 if (transaction->getArbitrator() != localMachineId) {
1679 return Pair<bool, bool>(false, false);
1682 if (!transaction->isComplete()) {
1683 // Will arbitrate in incorrect order if we continue so just break
1685 return Pair<bool, bool>(false, false);
1688 if (transaction->getMachineId() != localMachineId) {
1689 // dont do this check for local transactions
1690 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1691 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1692 // We've have already seen this from the server
1693 return Pair<bool, bool>(false, false);
1698 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1699 // Guard evaluated as true Create the commit and increment the
1700 // commit sequence number
1701 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1702 localArbitrationSequenceNumber++;
1704 // Update the local changes so we can make the commit
1705 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1706 newCommit->addKV(kv);
1709 // create the commit parts
1710 newCommit->createCommitParts();
1712 // Append all the commit parts to the end of the pending queue
1713 // waiting for sending to the server
1714 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1715 pendingSendArbitrationRounds->add(arbitrationRound);
1717 if (compactArbitrationData()) {
1718 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1719 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1720 processEntry(commitPart);
1723 // Insert the commit so we can process it
1724 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1725 processEntry(commitPart);
1729 if (transaction->getMachineId() == localMachineId) {
1730 TransactionStatus *status = transaction->getTransactionStatus();
1731 if (status != NULL) {
1732 status->setStatus(TransactionStatus_StatusCommitted);
1736 updateLiveStateFromLocal();
1737 return Pair<bool, bool>(true, true);
1739 if (transaction->getMachineId() == localMachineId) {
1740 // For locally created messages update the status
1741 // Guard evaluated was false so create abort
1742 TransactionStatus status = transaction->getTransactionStatus();
1743 if (status != NULL) {
1744 status->setStatus(TransactionStatus_StatusAborted);
1747 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1750 Abort *newAbort = new Abort(NULL,
1751 transaction->getClientLocalSequenceNumber(),
1753 transaction->getMachineId(),
1754 transaction->getArbitrator(),
1755 localArbitrationSequenceNumber);
1756 localArbitrationSequenceNumber++;
1757 addAbortSet->add(newAbort);
1759 // Append all the commit parts to the end of the pending queue
1760 // waiting for sending to the server
1761 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1762 pendingSendArbitrationRounds->add(arbitrationRound);
1764 if (compactArbitrationData()) {
1765 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1766 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1767 processEntry(commitPart);
1772 updateLiveStateFromLocal();
1773 return Pair<bool, bool>(true, false);
1778 * Compacts the arbitration data my merging commits and aggregating
1779 * aborts so that a single large push of commits can be done instead
1780 * of many small updates
1782 bool Table::compactArbitrationData() {
1783 if (pendingSendArbitrationRounds->size() < 2) {
1784 // Nothing to compact so do nothing
1788 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1789 if (lastRound->didSendPart()) {
1793 bool hadCommit = (lastRound->getCommit() == NULL);
1794 bool gotNewCommit = false;
1796 int numberToDelete = 1;
1797 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1798 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1800 if (round->isFull() || round->didSendPart()) {
1801 // Stop since there is a part that cannot be compacted and we
1802 // need to compact in order
1806 if (round->getCommit() == NULL) {
1807 // Try compacting aborts only
1808 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1809 if (newSize > ArbitrationRound->MAX_PARTS) {
1810 // Cant compact since it would be too large
1813 lastRound->addAborts(round->getAborts());
1815 // Create a new larger commit
1816 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1817 localArbitrationSequenceNumber++;
1819 // Create the commit parts so that we can count them
1820 newCommit->createCommitParts();
1822 // Calculate the new size of the parts
1823 int newSize = newCommit->getNumberOfParts();
1824 newSize += lastRound->getAbortsCount();
1825 newSize += round->getAbortsCount();
1827 if (newSize > ArbitrationRound->MAX_PARTS) {
1828 // Cant compact since it would be too large
1832 // Set the new compacted part
1833 lastRound->setCommit(newCommit);
1834 lastRound->addAborts(round->getAborts());
1835 gotNewCommit = true;
1841 if (numberToDelete != 1) {
1842 // If there is a compaction
1843 // Delete the previous pieces that are now in the new compacted piece
1844 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1845 pendingSendArbitrationRounds->clear();
1847 for (int i = 0; i < numberToDelete; i++) {
1848 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1852 // Add the new compacted into the pending to send list
1853 pendingSendArbitrationRounds->add(lastRound);
1855 // Should reinsert into the commit processor
1856 if (hadCommit && gotNewCommit) {
1865 * Update all the commits and the committed tables, sets dead the dead
1868 bool Table::updateCommittedTable() {
1870 if (newCommitParts->size() == 0) {
1871 // Nothing new to process
1875 // Iterate through all the machine Ids that we received new parts for
1876 for (int64_t machineId : newCommitParts->keySet()) {
1877 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1879 // Iterate through all the parts for that machine Id
1880 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1881 CommitPart *part = parts->get(partId);
1883 // Get the transaction object for that sequence number
1884 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1886 if (commitForClientTable == NULL) {
1887 // This is the first commit from this device
1888 commitForClientTable = new Hashtable<int64_t, Commit *>();
1889 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1892 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1894 if (commit == NULL) {
1895 // This is a new commit that we dont have so make a new one
1896 commit = new Commit();
1898 // Insert this new commit into the live tables
1899 commitForClientTable->put(part->getSequenceNumber(), commit);
1902 // Add that part to the commit
1903 commit->addPartDecode(part);
1907 // Clear all the new commits parts in preparation for the next time
1908 // the server sends slots
1909 newCommitParts->clear();
1911 // If we process a new commit keep track of it for future use
1912 bool didProcessANewCommit = false;
1914 // Process the commits one by one
1915 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1917 // Get all the commits for a specific arbitrator
1918 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1920 // Sort the commits in order
1921 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1922 Collections->sort(commitSequenceNumbers);
1924 // Get the last commit seen from this arbitrator
1925 int64_t lastCommitSeenSequenceNumber = -1;
1926 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1927 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1930 // Go through each new commit one by one
1931 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1932 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1933 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1935 // Special processing if a commit is not complete
1936 if (!commit->isComplete()) {
1937 if (i == (commitSequenceNumbers->size() - 1)) {
1938 // If there is an incomplete commit and this commit is the
1939 // latest one seen then this commit cannot be processed and
1940 // there are no other commits
1943 // This is a commit that was already dead but parts of it
1944 // are still in the block chain (not flushed out yet)->
1945 // Delete it and move on
1947 commitForClientTable->remove(commit->getSequenceNumber());
1952 // Update the last transaction that was updated if we can
1953 if (commit->getTransactionSequenceNumber() != -1) {
1954 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1956 // Update the last transaction sequence number that the arbitrator arbitrated on
1957 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1958 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1962 // Update the last arbitration data that we have seen so far
1963 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1965 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1966 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1968 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1971 // Never seen any data from this arbitrator so record the first one
1972 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1975 // We have already seen this commit before so need to do the
1976 // full processing on this commit
1977 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1979 // Update the last transaction that was updated if we can
1980 if (commit->getTransactionSequenceNumber() != -1) {
1981 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1983 // Update the last transaction sequence number that the arbitrator arbitrated on
1984 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1985 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1992 // If we got here then this is a brand new commit and needs full
1994 // Get what commits should be edited, these are the commits that
1995 // have live values for their keys
1996 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1997 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1998 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2000 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2002 // Update each previous commit that needs to be updated
2003 for (Commit *previousCommit : commitsToEdit) {
2005 // Only bother with live commits (TODO: Maybe remove this check)
2006 if (previousCommit->isLive()) {
2008 // Update which keys in the old commits are still live
2009 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2010 previousCommit->invalidateKey(kv->getKey());
2013 // if the commit is now dead then remove it
2014 if (!previousCommit->isLive()) {
2015 commitForClientTable->remove(previousCommit);
2020 // Update the last seen sequence number from this arbitrator
2021 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2022 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2023 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2026 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2029 // We processed a new commit that we havent seen before
2030 didProcessANewCommit = true;
2032 // Update the committed table of keys and which commit is using which key
2033 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2034 committedKeyValueTable->put(kv->getKey(), kv);
2035 liveCommitsByKeyTable->put(kv->getKey(), commit);
2040 return didProcessANewCommit;
2044 * Create the speculative table from transactions that are still live
2045 * and have come from the cloud
2047 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2048 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2049 // There is nothing to speculate on
2053 // Create a list of the transaction sequence numbers and sort them
2054 // from oldest to newest
2055 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2056 Collections->sort(transactionSequenceNumbersSorted);
2058 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2061 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2062 // If there is a gap in the transaction sequence numbers then
2063 // there was a commit or an abort of a transaction OR there was a
2064 // new commit (Could be from offline commit) so a redo the
2065 // speculation from scratch
2067 // Start from scratch
2068 speculatedKeyValueTable->clear();
2069 lastTransactionSequenceNumberSpeculatedOn = -1;
2070 oldestTransactionSequenceNumberSpeculatedOn = -1;
2074 // Remember the front of the transaction list
2075 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2077 // Find where to start arbitration from
2078 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2080 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2081 // Make sure we are not out of bounds
2082 return false; // did not speculate
2085 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2086 bool didSkip = true;
2088 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2089 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2090 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2092 if (!transaction->isComplete()) {
2093 // If there is an incomplete transaction then there is nothing
2094 // we can do add this transactions arbitrator to the list of
2095 // arbitrators we should ignore
2096 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2101 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2105 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2107 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2108 // Guard evaluated to true so update the speculative table
2109 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2110 speculatedKeyValueTable->put(kv->getKey(), kv);
2116 // Since there was a skip we need to redo the speculation next time around
2117 lastTransactionSequenceNumberSpeculatedOn = -1;
2118 oldestTransactionSequenceNumberSpeculatedOn = -1;
2121 // We did some speculation
2126 * Create the pending transaction speculative table from transactions
2127 * that are still in the pending transaction buffer
2129 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2130 if (pendingTransactionQueue->size() == 0) {
2131 // There is nothing to speculate on
2135 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2136 // need to reset on the pending speculation
2137 lastPendingTransactionSpeculatedOn = NULL;
2138 firstPendingTransaction = pendingTransactionQueue->get(0);
2139 pendingTransactionSpeculatedKeyValueTable->clear();
2142 // Find where to start arbitration from
2143 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2145 if (startIndex >= pendingTransactionQueue->size()) {
2146 // Make sure we are not out of bounds
2150 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2151 Transaction *transaction = pendingTransactionQueue->get(i);
2153 lastPendingTransactionSpeculatedOn = transaction;
2155 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2156 // Guard evaluated to true so update the speculative table
2157 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2158 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2165 * Set dead and remove from the live transaction tables the
2166 * transactions that are dead
2168 void Table::updateLiveTransactionsAndStatus() {
2170 // Go through each of the transactions
2171 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2172 Transaction *transaction = iter->next()->getValue();
2174 // Check if the transaction is dead
2175 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2176 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2178 // Set dead the transaction
2179 transaction->setDead();
2181 // Remove the transaction from the live table
2183 liveTransactionByTransactionIdTable->remove(transaction->getId());
2187 // Go through each of the transactions
2188 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2189 TransactionStatus *status = iter->next()->getValue();
2191 // Check if the transaction is dead
2192 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2193 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2196 status->setStatus(TransactionStatus_StatusCommitted);
2205 * Process this slot, entry by entry-> Also update the latest message sent by slot
2207 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2209 // Update the last message seen
2210 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2212 // Process each entry in the slot
2213 for (Entry *entry : slot->getEntries()) {
2214 switch (entry->getType()) {
2215 case TypeCommitPart:
2216 processEntry((CommitPart *)entry);
2219 processEntry((Abort *)entry);
2221 case TypeTransactionPart:
2222 processEntry((TransactionPart *)entry);
2225 processEntry((NewKey *)entry);
2227 case TypeLastMessage:
2228 processEntry((LastMessage *)entry, machineSet);
2230 case TypeRejectedMessage:
2231 processEntry((RejectedMessage *)entry, indexer);
2233 case TypeTableStatus:
2234 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2237 throw new Error("Unrecognized type: ");
2243 * Update the last message that was sent for a machine Id
2245 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2246 // Update what the last message received by a machine was
2247 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2251 * Add the new key to the arbitrators table and update the set of live
2252 * new keys (in case of a rescued new key message)
2254 void Table::processEntry(NewKey *entry) {
2255 // Update the arbitrator table with the new key information
2256 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2258 // Update what the latest live new key is
2259 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2260 if (oldNewKey != NULL) {
2261 // Delete the old new key messages
2262 oldNewKey->setDead();
2267 * Process new table status entries and set dead the old ones as new
2268 * ones come in-> keeps track of the largest and smallest table status
2269 * seen in this current round of updating the local copy of the block
2272 void Table::processEntry(TableStatus entry, int64_t seq) {
2273 int newNumSlots = entry->getMaxSlots();
2274 updateCurrMaxSize(newNumSlots);
2275 initExpectedSize(seq, newNumSlots);
2277 if (liveTableStatus != NULL) {
2278 // We have a larger table status so the old table status is no
2280 liveTableStatus->setDead();
2283 // Make this new table status the latest alive table status
2284 liveTableStatus = entry;
2288 * Check old messages to see if there is a block chain violation->
2291 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2292 int64_t oldSeqNum = entry->getOldSeqNum();
2293 int64_t newSeqNum = entry->getNewSeqNum();
2294 bool isequal = entry->getEqual();
2295 int64_t machineId = entry->getMachineID();
2296 int64_t seq = entry->getSequenceNumber();
2298 // Check if we have messages that were supposed to be rejected in
2299 // our local block chain
2300 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2302 Slot *slot = indexer->getSlot(seqNum);
2305 // If we have this slot make sure that it was not supposed to be
2307 int64_t slotMachineId = slot->getMachineID();
2308 if (isequal != (slotMachineId == machineId)) {
2309 throw new Error("Server Error: Trying to insert rejected message for slot ");
2314 // Create a list of clients to watch until they see this rejected
2316 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2317 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2318 // Machine ID for the last message entry
2319 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2321 // We've seen it, don't need to continue to watch-> Our next
2322 // message will implicitly acknowledge it->
2323 if (lastMessageEntryMachineId == localMachineId) {
2327 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2328 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2330 if (entrySequenceNumber < seq) {
2331 // Add this rejected message to the set of messages that this
2332 // machine ID did not see yet
2333 addWatchVector(lastMessageEntryMachineId, entry);
2334 // This client did not see this rejected message yet so add it
2335 // to the watch set to monitor
2336 deviceWatchSet->add(lastMessageEntryMachineId);
2339 if (deviceWatchSet->isEmpty()) {
2340 // This rejected message has been seen by all the clients so
2343 // We need to watch this rejected message
2344 entry->setWatchSet(deviceWatchSet);
2349 * Check if this abort is live, if not then save it so we can kill it
2350 * later-> update the last transaction number that was arbitrated on->
2352 void Table::processEntry(Abort *entry) {
2353 if (entry->getTransactionSequenceNumber() != -1) {
2354 // update the transaction status if it was sent to the server
2355 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2356 if (status != NULL) {
2357 status->setStatus(TransactionStatus_StatusAborted);
2361 // Abort has not been seen by the client it is for yet so we need to
2363 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2364 if (previouslySeenAbort != NULL) {
2365 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2368 if (entry->getTransactionArbitrator() == localMachineId) {
2369 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2372 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2373 // The machine already saw this so it is dead
2375 liveAbortTable->remove(entry->getAbortId());
2377 if (entry->getTransactionArbitrator() == localMachineId) {
2378 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2383 // Update the last arbitration data that we have seen so far
2384 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2385 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2386 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2388 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2391 // Never seen any data from this arbitrator so record the first one
2392 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2395 // Set dead a transaction if we can
2396 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2397 if (transactionToSetDead != NULL) {
2398 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2401 // Update the last transaction sequence number that the arbitrator
2403 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2404 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2406 if (entry->getTransactionSequenceNumber() != -1) {
2407 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2413 * Set dead the transaction part if that transaction is dead and keep
2414 * track of all new parts
2416 void Table::processEntry(TransactionPart *entry) {
2417 // Check if we have already seen this transaction and set it dead OR
2418 // if it is not alive
2419 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2420 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2421 // This transaction is dead, it was already committed or aborted
2426 // This part is still alive
2427 Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2429 if (transactionPart == NULL) {
2430 // Dont have a table for this machine Id yet so make one
2431 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2432 newTransactionParts->put(entry->getMachineId(), transactionPart);
2435 // Update the part and set dead ones we have already seen (got a
2437 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2438 if (previouslySeenPart != NULL) {
2439 previouslySeenPart->setDead();
2444 * Process new commit entries and save them for future use-> Delete duplicates
2446 void Table::processEntry(CommitPart *entry) {
2447 // Update the last transaction that was updated if we can
2448 if (entry->getTransactionSequenceNumber() != -1) {
2449 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2450 // Update the last transaction sequence number that the arbitrator
2452 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2453 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2457 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2458 if (commitPart == NULL) {
2459 // Don't have a table for this machine Id yet so make one
2460 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2461 newCommitParts->put(entry->getMachineId(), commitPart);
2463 // Update the part and set dead ones we have already seen (got a
2465 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2466 if (previouslySeenPart != NULL) {
2467 previouslySeenPart->setDead();
2472 * Update the last message seen table-> Update and set dead the
2473 * appropriate RejectedMessages as clients see them-> Updates the live
2474 * aborts, removes those that are dead and sets them dead-> Check that
2475 * the last message seen is correct and that there is no mismatch of
2476 * our own last message or that other clients have not had a rollback
2477 * on the last message->
2479 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2480 // We have seen this machine ID
2481 machineSet->remove(machineId);
2483 // Get the set of rejected messages that this machine Id is has not seen yet
2484 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2485 // If there is a rejected message that this machine Id has not seen yet
2486 if (watchset != NULL) {
2487 // Go through each rejected message that this machine Id has not
2489 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2490 RejectedMessage *rm = rmit->next();
2491 // If this machine Id has seen this rejected message->->->
2492 if (rm->getSequenceNumber() <= seqNum) {
2493 // Remove it from our watchlist
2495 // Decrement machines that need to see this notification
2496 rm->removeWatcher(machineId);
2501 // Set dead the abort
2502 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2503 Abort *abort = i->next()->getValue();
2504 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2507 if (abort->getTransactionArbitrator() == localMachineId) {
2508 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2512 if (machineId == localMachineId) {
2513 // Our own messages are immediately dead->
2514 if (liveness instanceof LastMessage) {
2515 ((LastMessage *)liveness)->setDead();
2516 } else if (liveness instanceof Slot) {
2517 ((Slot *)liveness)->setDead();
2519 throw new Error("Unrecognized type");
2522 // Get the old last message for this device
2523 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2524 if (lastMessageEntry == NULL) {
2525 // If no last message then there is nothing else to process
2529 int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2530 Liveness *lastEntry = lastMessageEntry.getSecond();
2532 // If it is not our machine Id since we already set ours to dead
2533 if (machineId != localMachineId) {
2534 if (lastEntry instanceof LastMessage) {
2535 ((LastMessage *)lastEntry)->setDead();
2536 } else if (lastEntry instanceof Slot) {
2537 ((Slot *)lastEntry)->setDead();
2539 throw new Error("Unrecognized type");
2542 // Make sure the server is not playing any games
2543 if (machineId == localMachineId) {
2544 if (hadPartialSendToServer) {
2545 // We were not making any updates and we had a machine mismatch
2546 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2547 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2550 // We were not making any updates and we had a machine mismatch
2551 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2552 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2556 if (lastMessageSeqNum > seqNum) {
2557 throw new Error("Server Error: Rollback on remote machine sequence number");
2563 * Add a rejected message entry to the watch set to keep track of
2564 * which clients have seen that rejected message entry and which have
2567 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2568 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2569 if (entries == NULL) {
2570 // There is no set for this machine ID yet so create one
2571 entries = new Hashset<RejectedMessage *>();
2572 rejectedMessageWatchVectorTable->put(machineId, entries);
2574 entries->add(entry);
2578 * Check if the HMAC chain is not violated
2580 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2581 for (int i = 0; i < newSlots->length(); i++) {
2582 Slot *currSlot = newSlots->get(i);
2583 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2584 if (prevSlot != NULL &&
2585 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2586 throw new Error("Server Error: Invalid HMAC Chain");