3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
19 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
21 cloud(new CloudComm(this, baseurl, password, listeningPort)),
23 liveTableStatus(NULL),
24 pendingTransactionBuilder(NULL),
25 lastPendingTransactionSpeculatedOn(NULL),
26 firstPendingTransaction(NULL),
28 bufferResizeThreshold(0),
30 oldestLiveSlotSequenceNumver(1),
31 localMachineId(_localMachineId),
33 localTransactionSequenceNumber(0),
34 lastTransactionSequenceNumberSpeculatedOn(0),
35 oldestTransactionSequenceNumberSpeculatedOn(0),
36 localArbitrationSequenceNumber(0),
37 hadPartialSendToServer(false),
38 attemptedToSendToServer(false),
40 didFindTableStatus(false),
42 lastSlotAttemptedToSend(NULL),
45 lastTransactionPartsSent(NULL),
46 lastPendingSendArbitrationEntriesToDelete(NULL),
48 committedKeyValueTable(NULL),
49 speculatedKeyValueTable(NULL),
50 pendingTransactionSpeculatedKeyValueTable(NULL),
51 liveNewKeyTable(NULL),
52 lastMessageTable(NULL),
53 rejectedMessageWatchVectorTable(NULL),
54 arbitratorTable(NULL),
56 newTransactionParts(NULL),
58 lastArbitratedTransactionNumberByArbitratorTable(NULL),
59 liveTransactionBySequenceNumberTable(NULL),
60 liveTransactionByTransactionIdTable(NULL),
61 liveCommitsTable(NULL),
62 liveCommitsByKeyTable(NULL),
63 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
64 rejectedSlotVector(NULL),
65 pendingTransactionQueue(NULL),
66 pendingSendArbitrationRounds(NULL),
67 pendingSendArbitrationEntriesToDelete(NULL),
68 transactionPartsSent(NULL),
69 outstandingTransactionStatus(NULL),
70 liveAbortsGeneratedByLocal(NULL),
71 offlineTransactionsCommittedAndAtServer(NULL),
72 localCommunicationTable(NULL),
73 lastTransactionSeenFromMachineFromServer(NULL),
74 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
75 lastInsertedNewKey(false),
81 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
85 liveTableStatus(NULL),
86 pendingTransactionBuilder(NULL),
87 lastPendingTransactionSpeculatedOn(NULL),
88 firstPendingTransaction(NULL),
90 bufferResizeThreshold(0),
92 oldestLiveSlotSequenceNumver(1),
93 localMachineId(_localMachineId),
95 localTransactionSequenceNumber(0),
96 lastTransactionSequenceNumberSpeculatedOn(0),
97 oldestTransactionSequenceNumberSpeculatedOn(0),
98 localArbitrationSequenceNumber(0),
99 hadPartialSendToServer(false),
100 attemptedToSendToServer(false),
102 didFindTableStatus(false),
104 lastSlotAttemptedToSend(NULL),
107 lastTransactionPartsSent(NULL),
108 lastPendingSendArbitrationEntriesToDelete(NULL),
110 committedKeyValueTable(NULL),
111 speculatedKeyValueTable(NULL),
112 pendingTransactionSpeculatedKeyValueTable(NULL),
113 liveNewKeyTable(NULL),
114 lastMessageTable(NULL),
115 rejectedMessageWatchVectorTable(NULL),
116 arbitratorTable(NULL),
117 liveAbortTable(NULL),
118 newTransactionParts(NULL),
119 newCommitParts(NULL),
120 lastArbitratedTransactionNumberByArbitratorTable(NULL),
121 liveTransactionBySequenceNumberTable(NULL),
122 liveTransactionByTransactionIdTable(NULL),
123 liveCommitsTable(NULL),
124 liveCommitsByKeyTable(NULL),
125 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
126 rejectedSlotVector(NULL),
127 pendingTransactionQueue(NULL),
128 pendingSendArbitrationRounds(NULL),
129 pendingSendArbitrationEntriesToDelete(NULL),
130 transactionPartsSent(NULL),
131 outstandingTransactionStatus(NULL),
132 liveAbortsGeneratedByLocal(NULL),
133 offlineTransactionsCommittedAndAtServer(NULL),
134 localCommunicationTable(NULL),
135 lastTransactionSeenFromMachineFromServer(NULL),
136 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
137 lastInsertedNewKey(false),
144 * Init all the stuff needed for for table usage
147 // Init helper objects
148 random = new Random();
149 buffer = new SlotBuffer();
152 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
153 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
154 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
155 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
156 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
157 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
158 arbitratorTable = new Hashtable<IoTString *, int64_t>();
159 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
160 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
161 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
162 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
163 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
164 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
165 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
166 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
167 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
168 rejectedSlotVector = new Vector<int64_t>();
169 pendingTransactionQueue = new Vector<Transaction *>();
170 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
171 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
172 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
173 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
174 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals>();
175 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
176 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
177 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
178 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
181 numberOfSlots = buffer->capacity();
182 setResizeThreshold();
186 * Initialize the table by inserting a table status as the first entry
187 * into the table status also initialize the crypto stuff.
189 void Table::initTable() {
190 cloud->initSecurity();
192 // Create the first insertion into the block chain which is the table status
193 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
194 localSequenceNumber++;
195 TableStatus *status = new TableStatus(s, numberOfSlots);
197 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
200 array = new Array<Slot *>(1);
202 // update local block chain
203 validateAndUpdate(array, true);
204 } else if (array->length() == 1) {
205 // in case we did push the slot BUT we failed to init it
206 validateAndUpdate(array, true);
208 throw new Error("Error on initialization");
213 * Rebuild the table from scratch by pulling the latest block chain
216 void Table::rebuild() {
217 // Just pull the latest slots from the server
218 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
219 validateAndUpdate(newslots, true);
221 updateLiveTransactionsAndStatus();
224 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
225 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
228 int64_t Table::getArbitrator(IoTString *key) {
229 return arbitratorTable->get(key);
232 void Table::close() {
236 IoTString *Table::getCommitted(IoTString *key) {
237 KeyValue *kv = committedKeyValueTable->get(key);
240 return kv->getValue();
246 IoTString *Table::getSpeculative(IoTString *key) {
247 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
250 kv = speculatedKeyValueTable->get(key);
254 kv = committedKeyValueTable->get(key);
258 return kv->getValue();
264 IoTString *Table::getCommittedAtomic(IoTString *key) {
265 KeyValue *kv = committedKeyValueTable->get(key);
267 if (!arbitratorTable->contains(key)) {
268 throw new Error("Key not Found.");
271 // Make sure new key value pair matches the current arbitrator
272 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
273 // TODO: Maybe not throw en error
274 throw new Error("Not all Key Values Match Arbitrator.");
278 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
279 return kv->getValue();
281 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
286 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
287 if (!arbitratorTable->contains(key)) {
288 throw new Error("Key not Found.");
291 // Make sure new key value pair matches the current arbitrator
292 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
293 // TODO: Maybe not throw en error
294 throw new Error("Not all Key Values Match Arbitrator.");
297 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
300 kv = speculatedKeyValueTable->get(key);
304 kv = committedKeyValueTable->get(key);
308 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
309 return kv->getValue();
311 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
316 bool Table::update() {
318 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
319 validateAndUpdate(newSlots, false);
321 updateLiveTransactionsAndStatus();
323 } catch (Exception *e) {
324 for (int64_t m : localCommunicationTable->keySet()) {
332 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
334 if (!arbitratorTable->contains(keyName)) {
335 // There is already an arbitrator
338 NewKey *newKey = new NewKey(NULL, keyName, machineId);
340 if (sendToServer(newKey)) {
341 // If successfully inserted
347 void Table::startTransaction() {
348 // Create a new transaction, invalidates any old pending transactions.
349 pendingTransactionBuilder = new PendingTransaction(localMachineId);
352 void Table::addKV(IoTString *key, IoTString *value) {
354 // Make sure it is a valid key
355 if (!arbitratorTable->contains(key)) {
356 throw new Error("Key not Found.");
359 // Make sure new key value pair matches the current arbitrator
360 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
361 // TODO: Maybe not throw en error
362 throw new Error("Not all Key Values Match Arbitrator.");
365 // Add the key value to this transaction
366 KeyValue *kv = new KeyValue(key, value);
367 pendingTransactionBuilder->addKV(kv);
370 TransactionStatus *Table::commitTransaction() {
371 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
372 // transaction with no updates will have no effect on the system
373 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
376 // Set the local transaction sequence number and increment
377 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
378 localTransactionSequenceNumber++;
380 // Create the transaction status
381 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
383 // Create the new transaction
384 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
385 newTransaction->setTransactionStatus(transactionStatus);
387 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
388 // Add it to the queue and invalidate the builder for safety
389 pendingTransactionQueue->add(newTransaction);
391 arbitrateOnLocalTransaction(newTransaction);
392 updateLiveStateFromLocal();
395 pendingTransactionBuilder = new PendingTransaction(localMachineId);
399 } catch (ServerException *e) {
401 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
402 uint size = pendingTransactionQueue->size();
404 for(int iter = 0; iter < size; iter++) {
405 Transaction *transaction = pendingTransactionQueue->get(iter);
406 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
408 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409 // Already contacted this client so ignore all attempts to contact this client
410 // to preserve ordering for arbitrator
414 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
416 if (sendReturn.getFirst()) {
417 // Failed to contact over local
418 arbitratorTriedAndFailed->add(transaction->getArbitrator());
420 // Successful contact or should not contact
422 if (sendReturn.getSecond()) {
429 pendingTransactionQueue->setSize(oldindex);
431 updateLiveStateFromLocal();
433 return transactionStatus;
437 * Recalculate the new resize threshold
439 void Table::setResizeThreshold() {
440 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
441 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
444 int64_t Table::getLocalSequenceNumber() {
445 return localSequenceNumber;
448 bool Table::sendToServer(NewKey *newKey) {
449 bool fromRetry = false;
451 if (hadPartialSendToServer) {
452 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
453 if (newSlots->length() == 0) {
455 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
457 if (sendSlotsReturn.getFirst()) {
458 if (newKey != NULL) {
459 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
464 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
465 transaction->resetServerFailure();
466 // Update which transactions parts still need to be sent
467 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
468 // Add the transaction status to the outstanding list
469 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
471 // Update the transaction status
472 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
474 // Check if all the transaction parts were successfully
475 // sent and if so then remove it from pending
476 if (transaction->didSendAllParts()) {
477 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
478 pendingTransactionQueue->remove(transaction);
482 newSlots = sendSlotsReturn.getThird();
483 bool isInserted = false;
484 for (uint si = 0; si < newSlots->length(); si++) {
485 Slot *s = newSlots->get(si);
486 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
492 for (uint si = 0; si < newSlots->length(); si++) {
493 Slot *s = newSlots->get(si);
498 // Process each entry in the slot
499 for (Entry *entry : s->getEntries()) {
500 if (entry->getType() == TypeLastMessage) {
501 LastMessage *lastMessage = (LastMessage *)entry;
502 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
511 if (newKey != NULL) {
512 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
517 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
518 transaction->resetServerFailure();
520 // Update which transactions parts still need to be sent
521 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
523 // Add the transaction status to the outstanding list
524 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
526 // Update the transaction status
527 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
529 // Check if all the transaction parts were successfully sent and if so then remove it from pending
530 if (transaction->didSendAllParts()) {
531 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
532 pendingTransactionQueue->remove(transaction);
534 transaction->resetServerFailure();
535 // Set the transaction sequence number back to nothing
536 if (!transaction->didSendAPartToServer()) {
537 transaction->setSequenceNumber(-1);
544 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
545 transaction->resetServerFailure();
546 // Set the transaction sequence number back to nothing
547 if (!transaction->didSendAPartToServer()) {
548 transaction->setSequenceNumber(-1);
552 if (sendSlotsReturn.getThird()->length() != 0) {
553 // insert into the local block chain
554 validateAndUpdate(sendSlotsReturn.getThird(), true);
558 bool isInserted = false;
559 for (uint si = 0; si < newSlots->length(); si++) {
560 Slot *s = newSlots->get(si);
561 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
567 for (uint si = 0; si < newSlots->length(); si++) {
568 Slot *s = newSlots->get(si);
573 // Process each entry in the slot
574 for (Entry *entry : s->getEntries()) {
576 if (entry->getType() == TypeLastMessage) {
577 LastMessage *lastMessage = (LastMessage *)entry;
578 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
587 if (newKey != NULL) {
588 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
593 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
594 transaction->resetServerFailure();
596 // Update which transactions parts still need to be sent
597 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
599 // Add the transaction status to the outstanding list
600 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
602 // Update the transaction status
603 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
605 // Check if all the transaction parts were successfully sent and if so then remove it from pending
606 if (transaction->didSendAllParts()) {
607 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
608 pendingTransactionQueue->remove(transaction);
610 transaction->resetServerFailure();
611 // Set the transaction sequence number back to nothing
612 if (!transaction->didSendAPartToServer()) {
613 transaction->setSequenceNumber(-1);
618 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
619 transaction->resetServerFailure();
620 // Set the transaction sequence number back to nothing
621 if (!transaction->didSendAPartToServer()) {
622 transaction->setSequenceNumber(-1);
627 // insert into the local block chain
628 validateAndUpdate(newSlots, true);
631 } catch (ServerException *e) {
638 // While we have stuff that needs inserting into the block chain
639 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
643 if (hadPartialSendToServer) {
644 throw new Error("Should Be error free");
649 // If there is a new key with same name then end
650 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
655 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
656 localSequenceNumber++;
658 // Try to fill the slot with data
659 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
660 bool needsResize = fillSlotsReturn.getFirst();
661 int newSize = fillSlotsReturn.getSecond();
662 bool insertedNewKey = fillSlotsReturn.getThird();
665 // Reset which transaction to send
666 for (Transaction *transaction : transactionPartsSent->keySet()) {
667 transaction->resetNextPartToSend();
669 // Set the transaction sequence number back to nothing
670 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
671 transaction->setSequenceNumber(-1);
675 // Clear the sent data since we are trying again
676 pendingSendArbitrationEntriesToDelete->clear();
677 transactionPartsSent->clear();
679 // We needed a resize so try again
680 fillSlot(slot, true, newKey);
683 lastSlotAttemptedToSend = slot;
684 lastIsNewKey = (newKey != NULL);
685 lastInsertedNewKey = insertedNewKey;
686 lastNewSize = newSize;
688 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
689 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
692 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
694 if (sendSlotsReturn.getFirst()) {
696 // Did insert into the block chain
698 if (insertedNewKey) {
699 // This slot was what was inserted not a previous slot
701 // New Key was successfully inserted into the block chain so dont want to insert it again
705 // Remove the aborts and commit parts that were sent from the pending to send queue
706 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
707 ArbitrationRound *round = iter->next();
708 round->removeParts(pendingSendArbitrationEntriesToDelete);
710 if (round->isDoneSending()) {
711 // Sent all the parts
716 for (Transaction *transaction : transactionPartsSent->keySet()) {
717 transaction->resetServerFailure();
719 // Update which transactions parts still need to be sent
720 transaction->removeSentParts(transactionPartsSent->get(transaction));
722 // Add the transaction status to the outstanding list
723 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
725 // Update the transaction status
726 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
728 // Check if all the transaction parts were successfully sent and if so then remove it from pending
729 if (transaction->didSendAllParts()) {
730 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
731 pendingTransactionQueue->remove(transaction);
735 // Reset which transaction to send
736 for (Transaction *transaction : transactionPartsSent->keySet()) {
737 transaction->resetNextPartToSend();
739 // Set the transaction sequence number back to nothing
740 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
741 transaction->setSequenceNumber(-1);
746 // Clear the sent data in preparation for next send
747 pendingSendArbitrationEntriesToDelete->clear();
748 transactionPartsSent->clear();
750 if (sendSlotsReturn.getThird()->length() != 0) {
751 // insert into the local block chain
752 validateAndUpdate(sendSlotsReturn.getThird(), true);
756 } catch (ServerException *e) {
758 if (e->getType() != ServerException->TypeInputTimeout) {
759 // Nothing was able to be sent to the server so just clear these data structures
760 for (Transaction *transaction : transactionPartsSent->keySet()) {
761 transaction->resetNextPartToSend();
763 // Set the transaction sequence number back to nothing
764 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
765 transaction->setSequenceNumber(-1);
769 // There was a partial send to the server
770 hadPartialSendToServer = true;
772 // Nothing was able to be sent to the server so just clear these data structures
773 for (Transaction *transaction : transactionPartsSent->keySet()) {
774 transaction->resetNextPartToSend();
775 transaction->setServerFailure();
779 pendingSendArbitrationEntriesToDelete->clear();
780 transactionPartsSent->clear();
785 return newKey == NULL;
788 bool Table::updateFromLocal(int64_t machineId) {
789 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
790 if (localCommunicationInformation == NULL) {
791 // Cant talk to that device locally so do nothing
795 // Get the size of the send data
796 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
798 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
799 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
800 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
803 Array<char> *sendData = new Array<char>(sendDataSize);
804 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
807 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
811 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
812 localSequenceNumber++;
814 if (returnData == NULL) {
815 // Could not contact server
820 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
821 int numberOfEntries = bbDecode->getInt();
823 for (int i = 0; i < numberOfEntries; i++) {
824 char type = bbDecode->get();
825 if (type == TypeAbort) {
826 Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
828 } else if (type == TypeCommitPart) {
829 CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
830 processEntry(commitPart);
834 updateLiveStateFromLocal();
839 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
841 // Get the devices local communications
842 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
844 if (localCommunicationInformation == NULL) {
845 // Cant talk to that device locally so do nothing
846 return Pair<bool, bool>(true, false);
849 // Get the size of the send data
850 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
851 for (TransactionPart *part : transaction->getParts()->values()) {
852 sendDataSize += part->getSize();
855 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
856 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
857 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
860 // Make the send data size
861 Array<char> *sendData = new Array<char>(sendDataSize);
862 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
865 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
866 bbEncode->putInt(transaction->getParts()->size());
867 for (TransactionPart *part : transaction->getParts()->values()) {
868 part->encode(bbEncode);
873 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
874 localSequenceNumber++;
876 if (returnData == NULL) {
877 // Could not contact server
878 return Pair<bool, bool>(true, false);
882 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
883 bool didCommit = bbDecode->get() == 1;
884 bool couldArbitrate = bbDecode->get() == 1;
885 int numberOfEntries = bbDecode->getInt();
886 bool foundAbort = false;
888 for (int i = 0; i < numberOfEntries; i++) {
889 char type = bbDecode->get();
890 if (type == TypeAbort) {
891 Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
893 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
898 } else if (type == TypeCommitPart) {
899 CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
900 processEntry(commitPart);
904 updateLiveStateFromLocal();
906 if (couldArbitrate) {
907 TransactionStatus status = transaction->getTransactionStatus();
909 status->setStatus(TransactionStatus_StatusCommitted);
911 status->setStatus(TransactionStatus_StatusAborted);
914 TransactionStatus status = transaction->getTransactionStatus();
916 status->setStatus(TransactionStatus_StatusAborted);
918 status->setStatus(TransactionStatus_StatusCommitted);
922 return Pair<bool, bool>(false, true);
925 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
928 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
929 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
930 int numberOfParts = bbDecode->getInt();
932 // If we did commit a transaction or not
933 bool didCommit = false;
934 bool couldArbitrate = false;
936 if (numberOfParts != 0) {
938 // decode the transaction
939 Transaction *transaction = new Transaction();
940 for (int i = 0; i < numberOfParts; i++) {
942 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
943 transaction->addPartDecode(newPart);
946 // Arbitrate on transaction and pull relevant return data
947 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
948 couldArbitrate = localArbitrateReturn.getFirst();
949 didCommit = localArbitrateReturn.getSecond();
951 updateLiveStateFromLocal();
953 // Transaction was sent to the server so keep track of it to prevent double commit
954 if (transaction->getSequenceNumber() != -1) {
955 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
959 // The data to send back
960 int returnDataSize = 0;
961 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
963 // Get the aborts to send back
964 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
965 Collections->sort(abortLocalSequenceNumbers);
966 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
967 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
971 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
972 unseenArbitrations->add(abort);
973 returnDataSize += abort->getSize();
976 // Get the commits to send back
977 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
978 if (commitForClientTable != NULL) {
979 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
980 Collections->sort(commitLocalSequenceNumbers);
982 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
983 Commit *commit = commitForClientTable->get(localSequenceNumber);
985 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
989 unseenArbitrations->addAll(commit->getParts()->values());
991 for (CommitPart *commitPart : commit->getParts()->values()) {
992 returnDataSize += commitPart->getSize();
997 // Number of arbitration entries to decode
998 returnDataSize += 2 * sizeof(int32_t);
1000 // bool of did commit or not
1001 if (numberOfParts != 0) {
1002 returnDataSize += sizeof(char);
1005 // Data to send Back
1006 Array<char> *returnData = new Array<char>(returnDataSize);
1007 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1009 if (numberOfParts != 0) {
1011 bbEncode->put((char)1);
1013 bbEncode->put((char)0);
1015 if (couldArbitrate) {
1016 bbEncode->put((char)1);
1018 bbEncode->put((char)0);
1022 bbEncode->putInt(unseenArbitrations->size());
1023 uint size = unseenArbitrations->size();
1024 for(uint i = 0; i< size; i++) {
1025 Entry * entry = unseenArbitrations->get(i);
1026 entry->encode(bbEncode);
1029 localSequenceNumber++;
1033 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1034 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1035 attemptedToSendToServer = true;
1037 bool inserted = false;
1038 bool lastTryInserted = false;
1040 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1041 if (array == NULL) {
1042 array = new Array<Slot *>();
1043 array->set(0, slot);
1044 rejectedSlotVector->clear();
1047 if (array->length() == 0) {
1048 throw new Error("Server Error: Did not send any slots");
1051 // if (attemptedToSendToServerTmp) {
1052 if (hadPartialSendToServer) {
1054 bool isInserted = false;
1055 uint size = s->size();
1056 for(uint i=0; i < size; i++) {
1057 Slot *s = array->get(i);
1058 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1064 for(uint i=0; i < size; i++) {
1065 Slot *s = array->get(i);
1070 // Process each entry in the slot
1071 for (Entry *entry : s->getEntries()) {
1073 if (entry->getType() == TypeLastMessage) {
1074 LastMessage *lastMessage = (LastMessage *)entry;
1076 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1085 rejectedSlotVector->add(slot->getSequenceNumber());
1086 lastTryInserted = false;
1088 lastTryInserted = true;
1091 rejectedSlotVector->add(slot->getSequenceNumber());
1092 lastTryInserted = false;
1096 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1100 * Returns false if a resize was needed
1102 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1104 if (liveSlotCount > bufferResizeThreshold) {
1105 resize = true; //Resize is forced
1109 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1110 TableStatus *status = new TableStatus(slot, newSize);
1111 slot->addEntry(status);
1114 // Fill with rejected slots first before doing anything else
1115 doRejectedMessages(slot);
1117 // Do mandatory rescue of entries
1118 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1120 // Extract working variables
1121 bool needsResize = mandatoryRescueReturn.getFirst();
1122 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1123 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1125 if (needsResize && !resize) {
1126 // We need to resize but we are not resizing so return false
1127 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1130 bool inserted = false;
1131 if (newKeyEntry != NULL) {
1132 newKeyEntry->setSlot(slot);
1133 if (slot->hasSpace(newKeyEntry)) {
1134 slot->addEntry(newKeyEntry);
1139 // Clear the transactions, aborts and commits that were sent previously
1140 transactionPartsSent->clear();
1141 pendingSendArbitrationEntriesToDelete->clear();
1143 for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1144 bool isFull = false;
1145 round->generateParts();
1146 Vector<Entry *> *parts = round->getParts();
1148 // Insert pending arbitration data
1149 for (Entry *arbitrationData : parts) {
1151 // If it is an abort then we need to set some information
1152 if (arbitrationData instanceof Abort) {
1153 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1156 if (!slot->hasSpace(arbitrationData)) {
1157 // No space so cant do anything else with these data entries
1162 // Add to this current slot and add it to entries to delete
1163 slot->addEntry(arbitrationData);
1164 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1172 if (pendingTransactionQueue->size() > 0) {
1173 Transaction *transaction = pendingTransactionQueue->get(0);
1174 // Set the transaction sequence number if it has yet to be inserted into the block chain
1175 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1176 transaction->setSequenceNumber(slot->getSequenceNumber());
1180 TransactionPart *part = transaction->getNextPartToSend();
1182 // Ran out of parts to send for this transaction so move on
1186 if (slot->hasSpace(part)) {
1187 slot->addEntry(part);
1188 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1189 if (partsSent == NULL) {
1190 partsSent = new Vector<int32_t>();
1191 transactionPartsSent->put(transaction, partsSent);
1193 partsSent->add(part->getPartNumber());
1194 transactionPartsSent->put(transaction, partsSent);
1201 // Fill the remainder of the slot with rescue data
1202 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1204 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1207 void Table::doRejectedMessages(Slot *s) {
1208 if (!rejectedSlotVector->isEmpty()) {
1209 /* TODO: We should avoid generating a rejected message entry if
1210 * there is already a sufficient entry in the queue (e->g->,
1211 * equalsto value of true and same sequence number)-> */
1213 int64_t old_seqn = rejectedSlotVector->firstElement();
1214 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1215 int64_t new_seqn = rejectedSlotVector->lastElement();
1216 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1219 int64_t prev_seqn = -1;
1221 /* Go through list of missing messages */
1222 for (; i < rejectedSlotVector->size(); i++) {
1223 int64_t curr_seqn = rejectedSlotVector->get(i);
1224 Slot *s_msg = buffer->getSlot(curr_seqn);
1227 prev_seqn = curr_seqn;
1229 /* Generate rejected message entry for missing messages */
1230 if (prev_seqn != -1) {
1231 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1234 /* Generate rejected message entries for present messages */
1235 for (; i < rejectedSlotVector->size(); i++) {
1236 int64_t curr_seqn = rejectedSlotVector->get(i);
1237 Slot *s_msg = buffer->getSlot(curr_seqn);
1238 int64_t machineid = s_msg->getMachineID();
1239 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1246 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1247 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1248 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1249 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1250 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1253 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1254 bool seenLiveSlot = false;
1255 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1256 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1260 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1261 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1262 // Push slot number forward
1263 if (!seenLiveSlot) {
1264 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1267 if (!previousSlot->isLive()) {
1271 // We have seen a live slot
1272 seenLiveSlot = true;
1274 // Get all the live entries for a slot
1275 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1277 // Iterate over all the live entries and try to rescue them
1278 for (Entry *liveEntry : liveEntries) {
1279 if (slot->hasSpace(liveEntry)) {
1280 // Enough space to rescue the entry
1281 slot->addEntry(liveEntry);
1282 } else if (currentSequenceNumber == firstIfFull) {
1283 //if there's no space but the entry is about to fall off the queue
1284 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1290 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1293 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1294 /* now go through live entries from least to greatest sequence number until
1295 * either all live slots added, or the slot doesn't have enough room
1296 * for SKIP_THRESHOLD consecutive entries*/
1298 int64_t newestseqnum = buffer->getNewestSeqNum();
1300 for (; seqn <= newestseqnum; seqn++) {
1301 Slot *prevslot = buffer->getSlot(seqn);
1302 //Push slot number forward
1304 oldestLiveSlotSequenceNumver = seqn;
1306 if (!prevslot->isLive())
1308 seenliveslot = true;
1309 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1310 for (Entry *liveentry : liveentries) {
1311 if (s->hasSpace(liveentry))
1312 s->addEntry(liveentry);
1315 if (skipcount > Table_SKIP_THRESHOLD)
1323 * Checks for malicious activity and updates the local copy of the block chain->
1325 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1326 // The cloud communication layer has checked slot HMACs already
1328 if (newSlots->length() == 0) {
1332 // Make sure all slots are newer than the last largest slot this
1334 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1335 if (firstSeqNum <= sequenceNumber) {
1336 throw new Error("Server Error: Sent older slots!");
1339 // Create an object that can access both new slots and slots in our
1340 // local chain without committing slots to our local chain
1341 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1343 // Check that the HMAC chain is not broken
1344 checkHMACChain(indexer, newSlots);
1346 // Set to keep track of messages from clients
1347 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1349 // Process each slots data
1350 for (Slot *slot : newSlots) {
1351 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1353 updateExpectedSize();
1356 // If there is a gap, check to see if the server sent us
1358 if (firstSeqNum != (sequenceNumber + 1)) {
1360 // Check the size of the slots that were sent down by the server->
1361 // Can only check the size if there was a gap
1362 checkNumSlots(newSlots->length);
1364 // Since there was a gap every machine must have pushed a slot or
1365 // must have a last message message-> If not then the server is
1367 if (!machineSet->isEmpty()) {
1368 throw new Error("Missing record for machines: ");
1372 // Update the size of our local block chain->
1375 // Commit new to slots to the local block chain->
1376 for (Slot *slot : newSlots) {
1378 // Insert this slot into our local block chain copy->
1379 buffer->putSlot(slot);
1381 // Keep track of how many slots are currently live (have live data
1386 // Get the sequence number of the latest slot in the system
1387 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1388 updateLiveStateFromServer();
1390 // No Need to remember after we pulled from the server
1391 offlineTransactionsCommittedAndAtServer->clear();
1393 // This is invalidated now
1394 hadPartialSendToServer = false;
1397 void Table::updateLiveStateFromServer() {
1398 // Process the new transaction parts
1399 processNewTransactionParts();
1401 // Do arbitration on new transactions that were received
1402 arbitrateFromServer();
1404 // Update all the committed keys
1405 bool didCommitOrSpeculate = updateCommittedTable();
1407 // Delete the transactions that are now dead
1408 updateLiveTransactionsAndStatus();
1411 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1412 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1415 void Table::updateLiveStateFromLocal() {
1416 // Update all the committed keys
1417 bool didCommitOrSpeculate = updateCommittedTable();
1419 // Delete the transactions that are now dead
1420 updateLiveTransactionsAndStatus();
1423 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1424 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1427 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1428 int64_t prevslots = firstSequenceNumber;
1430 if (didFindTableStatus) {
1432 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1435 didFindTableStatus = true;
1436 currMaxSize = numberOfSlots;
1439 void Table::updateExpectedSize() {
1442 if (expectedsize > currMaxSize) {
1443 expectedsize = currMaxSize;
1449 * Check the size of the block chain to make sure there are enough
1450 * slots sent back by the server-> This is only called when we have a
1451 * gap between the slots that we have locally and the slots sent by
1452 * the server therefore in the slots sent by the server there will be
1453 * at least 1 Table status message
1455 void Table::checkNumSlots(int numberOfSlots) {
1456 if (numberOfSlots != expectedsize) {
1457 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1461 void Table::updateCurrMaxSize(int newmaxsize) {
1462 currMaxSize = newmaxsize;
1467 * Update the size of of the local buffer if it is needed->
1469 void Table::commitNewMaxSize() {
1470 didFindTableStatus = false;
1472 // Resize the local slot buffer
1473 if (numberOfSlots != currMaxSize) {
1474 buffer->resize((int32_t)currMaxSize);
1477 // Change the number of local slots to the new size
1478 numberOfSlots = (int32_t)currMaxSize;
1480 // Recalculate the resize threshold since the size of the local
1481 // buffer has changed
1482 setResizeThreshold();
1486 * Process the new transaction parts from this latest round of slots
1487 * received from the server
1489 void Table::processNewTransactionParts() {
1491 if (newTransactionParts->size() == 0) {
1492 // Nothing new to process
1496 // Iterate through all the machine Ids that we received new parts
1498 for (int64_t machineId : newTransactionParts->keySet()) {
1499 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1501 // Iterate through all the parts for that machine Id
1502 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1503 TransactionPart *part = parts->get(partId);
1505 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1506 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1507 // Set dead the transaction part
1512 // Get the transaction object for that sequence number
1513 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1515 if (transaction == NULL) {
1516 // This is a new transaction that we dont have so make a new one
1517 transaction = new Transaction();
1519 // Insert this new transaction into the live tables
1520 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1521 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1524 // Add that part to the transaction
1525 transaction->addPartDecode(part);
1529 // Clear all the new transaction parts in preparation for the next
1530 // time the server sends slots
1531 newTransactionParts->clear();
1534 void Table::arbitrateFromServer() {
1536 if (liveTransactionBySequenceNumberTable->size() == 0) {
1537 // Nothing to arbitrate on so move on
1541 // Get the transaction sequence numbers and sort from oldest to newest
1542 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1543 Collections->sort(transactionSequenceNumbers);
1545 // Collection of key value pairs that are
1546 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1548 // The last transaction arbitrated on
1549 int64_t lastTransactionCommitted = -1;
1550 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1552 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1553 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1557 // Check if this machine arbitrates for this transaction if not
1558 // then we cant arbitrate this transaction
1559 if (transaction->getArbitrator() != localMachineId) {
1563 if (transactionSequenceNumber < lastSeqNumArbOn) {
1567 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1568 // We have seen this already locally so dont commit again
1573 if (!transaction->isComplete()) {
1574 // Will arbitrate in incorrect order if we continue so just break
1580 // update the largest transaction seen by arbitrator from server
1581 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1582 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1584 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1585 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1586 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1590 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1591 // Guard evaluated as true
1593 // Update the local changes so we can make the commit
1594 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1595 speculativeTableTmp->put(kv->getKey(), kv);
1598 // Update what the last transaction committed was for use in batch commit
1599 lastTransactionCommitted = transactionSequenceNumber;
1601 // Guard evaluated was false so create abort
1603 Abort *newAbort = new Abort(NULL,
1604 transaction->getClientLocalSequenceNumber(),
1605 transaction->getSequenceNumber(),
1606 transaction->getMachineId(),
1607 transaction->getArbitrator(),
1608 localArbitrationSequenceNumber);
1609 localArbitrationSequenceNumber++;
1610 generatedAborts->add(newAbort);
1612 // Insert the abort so we can process
1613 processEntry(newAbort);
1616 lastSeqNumArbOn = transactionSequenceNumber;
1619 Commit *newCommit = NULL;
1621 // If there is something to commit
1622 if (speculativeTableTmp->size() != 0) {
1623 // Create the commit and increment the commit sequence number
1624 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1625 localArbitrationSequenceNumber++;
1627 // Add all the new keys to the commit
1628 for (KeyValue *kv : speculativeTableTmp->values()) {
1629 newCommit->addKV(kv);
1632 // create the commit parts
1633 newCommit->createCommitParts();
1635 // Append all the commit parts to the end of the pending queue
1636 // waiting for sending to the server
1637 // Insert the commit so we can process it
1638 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1639 processEntry(commitPart);
1643 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1644 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1645 pendingSendArbitrationRounds->add(arbitrationRound);
1647 if (compactArbitrationData()) {
1648 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1649 if (newArbitrationRound->getCommit() != NULL) {
1650 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1651 processEntry(commitPart);
1658 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1660 // Check if this machine arbitrates for this transaction if not then
1661 // we cant arbitrate this transaction
1662 if (transaction->getArbitrator() != localMachineId) {
1663 return Pair<bool, bool>(false, false);
1666 if (!transaction->isComplete()) {
1667 // Will arbitrate in incorrect order if we continue so just break
1669 return Pair<bool, bool>(false, false);
1672 if (transaction->getMachineId() != localMachineId) {
1673 // dont do this check for local transactions
1674 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1675 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1676 // We've have already seen this from the server
1677 return Pair<bool, bool>(false, false);
1682 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1683 // Guard evaluated as true Create the commit and increment the
1684 // commit sequence number
1685 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1686 localArbitrationSequenceNumber++;
1688 // Update the local changes so we can make the commit
1689 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1690 newCommit->addKV(kv);
1693 // create the commit parts
1694 newCommit->createCommitParts();
1696 // Append all the commit parts to the end of the pending queue
1697 // waiting for sending to the server
1698 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1699 pendingSendArbitrationRounds->add(arbitrationRound);
1701 if (compactArbitrationData()) {
1702 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1703 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1704 processEntry(commitPart);
1707 // Insert the commit so we can process it
1708 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1709 processEntry(commitPart);
1713 if (transaction->getMachineId() == localMachineId) {
1714 TransactionStatus *status = transaction->getTransactionStatus();
1715 if (status != NULL) {
1716 status->setStatus(TransactionStatus_StatusCommitted);
1720 updateLiveStateFromLocal();
1721 return Pair<bool, bool>(true, true);
1723 if (transaction->getMachineId() == localMachineId) {
1724 // For locally created messages update the status
1725 // Guard evaluated was false so create abort
1726 TransactionStatus status = transaction->getTransactionStatus();
1727 if (status != NULL) {
1728 status->setStatus(TransactionStatus_StatusAborted);
1731 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1734 Abort *newAbort = new Abort(NULL,
1735 transaction->getClientLocalSequenceNumber(),
1737 transaction->getMachineId(),
1738 transaction->getArbitrator(),
1739 localArbitrationSequenceNumber);
1740 localArbitrationSequenceNumber++;
1741 addAbortSet->add(newAbort);
1743 // Append all the commit parts to the end of the pending queue
1744 // waiting for sending to the server
1745 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1746 pendingSendArbitrationRounds->add(arbitrationRound);
1748 if (compactArbitrationData()) {
1749 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1750 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1751 processEntry(commitPart);
1756 updateLiveStateFromLocal();
1757 return Pair<bool, bool>(true, false);
1762 * Compacts the arbitration data my merging commits and aggregating
1763 * aborts so that a single large push of commits can be done instead
1764 * of many small updates
1766 bool Table::compactArbitrationData() {
1767 if (pendingSendArbitrationRounds->size() < 2) {
1768 // Nothing to compact so do nothing
1772 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1773 if (lastRound->didSendPart()) {
1777 bool hadCommit = (lastRound->getCommit() == NULL);
1778 bool gotNewCommit = false;
1780 int numberToDelete = 1;
1781 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1782 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1784 if (round->isFull() || round->didSendPart()) {
1785 // Stop since there is a part that cannot be compacted and we
1786 // need to compact in order
1790 if (round->getCommit() == NULL) {
1791 // Try compacting aborts only
1792 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1793 if (newSize > ArbitrationRound->MAX_PARTS) {
1794 // Cant compact since it would be too large
1797 lastRound->addAborts(round->getAborts());
1799 // Create a new larger commit
1800 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1801 localArbitrationSequenceNumber++;
1803 // Create the commit parts so that we can count them
1804 newCommit->createCommitParts();
1806 // Calculate the new size of the parts
1807 int newSize = newCommit->getNumberOfParts();
1808 newSize += lastRound->getAbortsCount();
1809 newSize += round->getAbortsCount();
1811 if (newSize > ArbitrationRound->MAX_PARTS) {
1812 // Cant compact since it would be too large
1816 // Set the new compacted part
1817 lastRound->setCommit(newCommit);
1818 lastRound->addAborts(round->getAborts());
1819 gotNewCommit = true;
1825 if (numberToDelete != 1) {
1826 // If there is a compaction
1827 // Delete the previous pieces that are now in the new compacted piece
1828 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1829 pendingSendArbitrationRounds->clear();
1831 for (int i = 0; i < numberToDelete; i++) {
1832 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1836 // Add the new compacted into the pending to send list
1837 pendingSendArbitrationRounds->add(lastRound);
1839 // Should reinsert into the commit processor
1840 if (hadCommit && gotNewCommit) {
1849 * Update all the commits and the committed tables, sets dead the dead
1852 bool Table::updateCommittedTable() {
1854 if (newCommitParts->size() == 0) {
1855 // Nothing new to process
1859 // Iterate through all the machine Ids that we received new parts for
1860 for (int64_t machineId : newCommitParts->keySet()) {
1861 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1863 // Iterate through all the parts for that machine Id
1864 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1865 CommitPart *part = parts->get(partId);
1867 // Get the transaction object for that sequence number
1868 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1870 if (commitForClientTable == NULL) {
1871 // This is the first commit from this device
1872 commitForClientTable = new Hashtable<int64_t, Commit *>();
1873 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1876 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1878 if (commit == NULL) {
1879 // This is a new commit that we dont have so make a new one
1880 commit = new Commit();
1882 // Insert this new commit into the live tables
1883 commitForClientTable->put(part->getSequenceNumber(), commit);
1886 // Add that part to the commit
1887 commit->addPartDecode(part);
1891 // Clear all the new commits parts in preparation for the next time
1892 // the server sends slots
1893 newCommitParts->clear();
1895 // If we process a new commit keep track of it for future use
1896 bool didProcessANewCommit = false;
1898 // Process the commits one by one
1899 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1901 // Get all the commits for a specific arbitrator
1902 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1904 // Sort the commits in order
1905 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1906 Collections->sort(commitSequenceNumbers);
1908 // Get the last commit seen from this arbitrator
1909 int64_t lastCommitSeenSequenceNumber = -1;
1910 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1911 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1914 // Go through each new commit one by one
1915 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1916 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1917 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1919 // Special processing if a commit is not complete
1920 if (!commit->isComplete()) {
1921 if (i == (commitSequenceNumbers->size() - 1)) {
1922 // If there is an incomplete commit and this commit is the
1923 // latest one seen then this commit cannot be processed and
1924 // there are no other commits
1927 // This is a commit that was already dead but parts of it
1928 // are still in the block chain (not flushed out yet)->
1929 // Delete it and move on
1931 commitForClientTable->remove(commit->getSequenceNumber());
1936 // Update the last transaction that was updated if we can
1937 if (commit->getTransactionSequenceNumber() != -1) {
1938 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1940 // Update the last transaction sequence number that the arbitrator arbitrated on
1941 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1942 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1946 // Update the last arbitration data that we have seen so far
1947 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1949 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1950 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1952 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1955 // Never seen any data from this arbitrator so record the first one
1956 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1959 // We have already seen this commit before so need to do the
1960 // full processing on this commit
1961 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1963 // Update the last transaction that was updated if we can
1964 if (commit->getTransactionSequenceNumber() != -1) {
1965 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1967 // Update the last transaction sequence number that the arbitrator arbitrated on
1968 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1969 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1976 // If we got here then this is a brand new commit and needs full
1978 // Get what commits should be edited, these are the commits that
1979 // have live values for their keys
1980 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1981 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1982 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1984 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
1986 // Update each previous commit that needs to be updated
1987 for (Commit *previousCommit : commitsToEdit) {
1989 // Only bother with live commits (TODO: Maybe remove this check)
1990 if (previousCommit->isLive()) {
1992 // Update which keys in the old commits are still live
1993 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1994 previousCommit->invalidateKey(kv->getKey());
1997 // if the commit is now dead then remove it
1998 if (!previousCommit->isLive()) {
1999 commitForClientTable->remove(previousCommit);
2004 // Update the last seen sequence number from this arbitrator
2005 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2006 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2007 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2010 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2013 // We processed a new commit that we havent seen before
2014 didProcessANewCommit = true;
2016 // Update the committed table of keys and which commit is using which key
2017 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2018 committedKeyValueTable->put(kv->getKey(), kv);
2019 liveCommitsByKeyTable->put(kv->getKey(), commit);
2024 return didProcessANewCommit;
2028 * Create the speculative table from transactions that are still live
2029 * and have come from the cloud
2031 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2032 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2033 // There is nothing to speculate on
2037 // Create a list of the transaction sequence numbers and sort them
2038 // from oldest to newest
2039 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2040 Collections->sort(transactionSequenceNumbersSorted);
2042 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2045 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2046 // If there is a gap in the transaction sequence numbers then
2047 // there was a commit or an abort of a transaction OR there was a
2048 // new commit (Could be from offline commit) so a redo the
2049 // speculation from scratch
2051 // Start from scratch
2052 speculatedKeyValueTable->clear();
2053 lastTransactionSequenceNumberSpeculatedOn = -1;
2054 oldestTransactionSequenceNumberSpeculatedOn = -1;
2058 // Remember the front of the transaction list
2059 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2061 // Find where to start arbitration from
2062 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2064 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2065 // Make sure we are not out of bounds
2066 return false; // did not speculate
2069 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2070 bool didSkip = true;
2072 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2073 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2074 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2076 if (!transaction->isComplete()) {
2077 // If there is an incomplete transaction then there is nothing
2078 // we can do add this transactions arbitrator to the list of
2079 // arbitrators we should ignore
2080 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2085 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2089 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2091 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2092 // Guard evaluated to true so update the speculative table
2093 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2094 speculatedKeyValueTable->put(kv->getKey(), kv);
2100 // Since there was a skip we need to redo the speculation next time around
2101 lastTransactionSequenceNumberSpeculatedOn = -1;
2102 oldestTransactionSequenceNumberSpeculatedOn = -1;
2105 // We did some speculation
2110 * Create the pending transaction speculative table from transactions
2111 * that are still in the pending transaction buffer
2113 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2114 if (pendingTransactionQueue->size() == 0) {
2115 // There is nothing to speculate on
2119 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2120 // need to reset on the pending speculation
2121 lastPendingTransactionSpeculatedOn = NULL;
2122 firstPendingTransaction = pendingTransactionQueue->get(0);
2123 pendingTransactionSpeculatedKeyValueTable->clear();
2126 // Find where to start arbitration from
2127 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2129 if (startIndex >= pendingTransactionQueue->size()) {
2130 // Make sure we are not out of bounds
2134 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2135 Transaction *transaction = pendingTransactionQueue->get(i);
2137 lastPendingTransactionSpeculatedOn = transaction;
2139 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2140 // Guard evaluated to true so update the speculative table
2141 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2142 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2149 * Set dead and remove from the live transaction tables the
2150 * transactions that are dead
2152 void Table::updateLiveTransactionsAndStatus() {
2154 // Go through each of the transactions
2155 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2156 Transaction *transaction = iter->next()->getValue();
2158 // Check if the transaction is dead
2159 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2160 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2162 // Set dead the transaction
2163 transaction->setDead();
2165 // Remove the transaction from the live table
2167 liveTransactionByTransactionIdTable->remove(transaction->getId());
2171 // Go through each of the transactions
2172 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2173 TransactionStatus *status = iter->next()->getValue();
2175 // Check if the transaction is dead
2176 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2177 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2180 status->setStatus(TransactionStatus_StatusCommitted);
2189 * Process this slot, entry by entry-> Also update the latest message sent by slot
2191 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2193 // Update the last message seen
2194 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2196 // Process each entry in the slot
2197 for (Entry *entry : slot->getEntries()) {
2198 switch (entry->getType()) {
2199 case TypeCommitPart:
2200 processEntry((CommitPart *)entry);
2203 processEntry((Abort *)entry);
2205 case TypeTransactionPart:
2206 processEntry((TransactionPart *)entry);
2209 processEntry((NewKey *)entry);
2211 case TypeLastMessage:
2212 processEntry((LastMessage *)entry, machineSet);
2214 case TypeRejectedMessage:
2215 processEntry((RejectedMessage *)entry, indexer);
2217 case TypeTableStatus:
2218 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2221 throw new Error("Unrecognized type: ");
2227 * Update the last message that was sent for a machine Id
2229 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2230 // Update what the last message received by a machine was
2231 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2235 * Add the new key to the arbitrators table and update the set of live
2236 * new keys (in case of a rescued new key message)
2238 void Table::processEntry(NewKey *entry) {
2239 // Update the arbitrator table with the new key information
2240 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2242 // Update what the latest live new key is
2243 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2244 if (oldNewKey != NULL) {
2245 // Delete the old new key messages
2246 oldNewKey->setDead();
2251 * Process new table status entries and set dead the old ones as new
2252 * ones come in-> keeps track of the largest and smallest table status
2253 * seen in this current round of updating the local copy of the block
2256 void Table::processEntry(TableStatus entry, int64_t seq) {
2257 int newNumSlots = entry->getMaxSlots();
2258 updateCurrMaxSize(newNumSlots);
2259 initExpectedSize(seq, newNumSlots);
2261 if (liveTableStatus != NULL) {
2262 // We have a larger table status so the old table status is no
2264 liveTableStatus->setDead();
2267 // Make this new table status the latest alive table status
2268 liveTableStatus = entry;
2272 * Check old messages to see if there is a block chain violation->
2275 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2276 int64_t oldSeqNum = entry->getOldSeqNum();
2277 int64_t newSeqNum = entry->getNewSeqNum();
2278 bool isequal = entry->getEqual();
2279 int64_t machineId = entry->getMachineID();
2280 int64_t seq = entry->getSequenceNumber();
2282 // Check if we have messages that were supposed to be rejected in
2283 // our local block chain
2284 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2286 Slot *slot = indexer->getSlot(seqNum);
2289 // If we have this slot make sure that it was not supposed to be
2291 int64_t slotMachineId = slot->getMachineID();
2292 if (isequal != (slotMachineId == machineId)) {
2293 throw new Error("Server Error: Trying to insert rejected message for slot ");
2298 // Create a list of clients to watch until they see this rejected
2300 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2301 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2302 // Machine ID for the last message entry
2303 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2305 // We've seen it, don't need to continue to watch-> Our next
2306 // message will implicitly acknowledge it->
2307 if (lastMessageEntryMachineId == localMachineId) {
2311 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2312 int64_t entrySequenceNumber = lastMessageValue.getFirst();
2314 if (entrySequenceNumber < seq) {
2315 // Add this rejected message to the set of messages that this
2316 // machine ID did not see yet
2317 addWatchVector(lastMessageEntryMachineId, entry);
2318 // This client did not see this rejected message yet so add it
2319 // to the watch set to monitor
2320 deviceWatchSet->add(lastMessageEntryMachineId);
2323 if (deviceWatchSet->isEmpty()) {
2324 // This rejected message has been seen by all the clients so
2327 // We need to watch this rejected message
2328 entry->setWatchSet(deviceWatchSet);
2333 * Check if this abort is live, if not then save it so we can kill it
2334 * later-> update the last transaction number that was arbitrated on->
2336 void Table::processEntry(Abort *entry) {
2337 if (entry->getTransactionSequenceNumber() != -1) {
2338 // update the transaction status if it was sent to the server
2339 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2340 if (status != NULL) {
2341 status->setStatus(TransactionStatus_StatusAborted);
2345 // Abort has not been seen by the client it is for yet so we need to
2347 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2348 if (previouslySeenAbort != NULL) {
2349 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2352 if (entry->getTransactionArbitrator() == localMachineId) {
2353 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2356 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
2357 // The machine already saw this so it is dead
2359 liveAbortTable->remove(entry->getAbortId());
2361 if (entry->getTransactionArbitrator() == localMachineId) {
2362 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2367 // Update the last arbitration data that we have seen so far
2368 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2369 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2370 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2372 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2375 // Never seen any data from this arbitrator so record the first one
2376 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2379 // Set dead a transaction if we can
2380 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2381 if (transactionToSetDead != NULL) {
2382 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2385 // Update the last transaction sequence number that the arbitrator
2387 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2388 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2390 if (entry->getTransactionSequenceNumber() != -1) {
2391 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2397 * Set dead the transaction part if that transaction is dead and keep
2398 * track of all new parts
2400 void Table::processEntry(TransactionPart *entry) {
2401 // Check if we have already seen this transaction and set it dead OR
2402 // if it is not alive
2403 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2404 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2405 // This transaction is dead, it was already committed or aborted
2410 // This part is still alive
2411 Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2413 if (transactionPart == NULL) {
2414 // Dont have a table for this machine Id yet so make one
2415 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2416 newTransactionParts->put(entry->getMachineId(), transactionPart);
2419 // Update the part and set dead ones we have already seen (got a
2421 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2422 if (previouslySeenPart != NULL) {
2423 previouslySeenPart->setDead();
2428 * Process new commit entries and save them for future use-> Delete duplicates
2430 void Table::processEntry(CommitPart *entry) {
2431 // Update the last transaction that was updated if we can
2432 if (entry->getTransactionSequenceNumber() != -1) {
2433 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2434 // Update the last transaction sequence number that the arbitrator
2436 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2437 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2441 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2442 if (commitPart == NULL) {
2443 // Don't have a table for this machine Id yet so make one
2444 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2445 newCommitParts->put(entry->getMachineId(), commitPart);
2447 // Update the part and set dead ones we have already seen (got a
2449 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2450 if (previouslySeenPart != NULL) {
2451 previouslySeenPart->setDead();
2456 * Update the last message seen table-> Update and set dead the
2457 * appropriate RejectedMessages as clients see them-> Updates the live
2458 * aborts, removes those that are dead and sets them dead-> Check that
2459 * the last message seen is correct and that there is no mismatch of
2460 * our own last message or that other clients have not had a rollback
2461 * on the last message->
2463 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2464 // We have seen this machine ID
2465 machineSet->remove(machineId);
2467 // Get the set of rejected messages that this machine Id is has not seen yet
2468 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2469 // If there is a rejected message that this machine Id has not seen yet
2470 if (watchset != NULL) {
2471 // Go through each rejected message that this machine Id has not
2473 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2474 RejectedMessage *rm = rmit->next();
2475 // If this machine Id has seen this rejected message->->->
2476 if (rm->getSequenceNumber() <= seqNum) {
2477 // Remove it from our watchlist
2479 // Decrement machines that need to see this notification
2480 rm->removeWatcher(machineId);
2485 // Set dead the abort
2486 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2487 Abort *abort = i->next()->getValue();
2488 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2491 if (abort->getTransactionArbitrator() == localMachineId) {
2492 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2496 if (machineId == localMachineId) {
2497 // Our own messages are immediately dead->
2498 if (liveness instanceof LastMessage) {
2499 ((LastMessage *)liveness)->setDead();
2500 } else if (liveness instanceof Slot) {
2501 ((Slot *)liveness)->setDead();
2503 throw new Error("Unrecognized type");
2506 // Get the old last message for this device
2507 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2508 if (lastMessageEntry == NULL) {
2509 // If no last message then there is nothing else to process
2513 int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
2514 Liveness *lastEntry = lastMessageEntry.getSecond();
2516 // If it is not our machine Id since we already set ours to dead
2517 if (machineId != localMachineId) {
2518 if (lastEntry instanceof LastMessage) {
2519 ((LastMessage *)lastEntry)->setDead();
2520 } else if (lastEntry instanceof Slot) {
2521 ((Slot *)lastEntry)->setDead();
2523 throw new Error("Unrecognized type");
2526 // Make sure the server is not playing any games
2527 if (machineId == localMachineId) {
2528 if (hadPartialSendToServer) {
2529 // We were not making any updates and we had a machine mismatch
2530 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2531 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2534 // We were not making any updates and we had a machine mismatch
2535 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2536 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2540 if (lastMessageSeqNum > seqNum) {
2541 throw new Error("Server Error: Rollback on remote machine sequence number");
2547 * Add a rejected message entry to the watch set to keep track of
2548 * which clients have seen that rejected message entry and which have
2551 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2552 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2553 if (entries == NULL) {
2554 // There is no set for this machine ID yet so create one
2555 entries = new Hashset<RejectedMessage *>();
2556 rejectedMessageWatchVectorTable->put(machineId, entries);
2558 entries->add(entry);
2562 * Check if the HMAC chain is not violated
2564 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2565 for (int i = 0; i < newSlots->length(); i++) {
2566 Slot *currSlot = newSlots->get(i);
2567 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2568 if (prevSlot != NULL &&
2569 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2570 throw new Error("Server Error: Invalid HMAC Chain");