3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
13 #include "SecureRandom.h"
14 #include "ByteBuffer.h"
16 #include "CommitPart.h"
17 #include "ArbitrationRound.h"
18 #include "TransactionPart.h"
20 #include "RejectedMessage.h"
21 #include "SlotIndexer.h"
24 int compareInt64(const void *a, const void *b) {
25 const int64_t *pa = (const int64_t *) a;
26 const int64_t *pb = (const int64_t *) b;
35 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
37 cloud(new CloudComm(this, baseurl, password, listeningPort)),
39 liveTableStatus(NULL),
40 pendingTransactionBuilder(NULL),
41 lastPendingTransactionSpeculatedOn(NULL),
42 firstPendingTransaction(NULL),
44 bufferResizeThreshold(0),
46 oldestLiveSlotSequenceNumver(1),
47 localMachineId(_localMachineId),
49 localSequenceNumber(0),
50 localTransactionSequenceNumber(0),
51 lastTransactionSequenceNumberSpeculatedOn(0),
52 oldestTransactionSequenceNumberSpeculatedOn(0),
53 localArbitrationSequenceNumber(0),
54 hadPartialSendToServer(false),
55 attemptedToSendToServer(false),
57 didFindTableStatus(false),
59 lastSlotAttemptedToSend(NULL),
62 lastTransactionPartsSent(NULL),
63 lastPendingSendArbitrationEntriesToDelete(NULL),
65 committedKeyValueTable(NULL),
66 speculatedKeyValueTable(NULL),
67 pendingTransactionSpeculatedKeyValueTable(NULL),
68 liveNewKeyTable(NULL),
69 lastMessageTable(NULL),
70 rejectedMessageWatchVectorTable(NULL),
71 arbitratorTable(NULL),
73 newTransactionParts(NULL),
75 lastArbitratedTransactionNumberByArbitratorTable(NULL),
76 liveTransactionBySequenceNumberTable(NULL),
77 liveTransactionByTransactionIdTable(NULL),
78 liveCommitsTable(NULL),
79 liveCommitsByKeyTable(NULL),
80 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
81 rejectedSlotVector(NULL),
82 pendingTransactionQueue(NULL),
83 pendingSendArbitrationRounds(NULL),
84 pendingSendArbitrationEntriesToDelete(NULL),
85 transactionPartsSent(NULL),
86 outstandingTransactionStatus(NULL),
87 liveAbortsGeneratedByLocal(NULL),
88 offlineTransactionsCommittedAndAtServer(NULL),
89 localCommunicationTable(NULL),
90 lastTransactionSeenFromMachineFromServer(NULL),
91 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
92 lastInsertedNewKey(false),
98 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
102 liveTableStatus(NULL),
103 pendingTransactionBuilder(NULL),
104 lastPendingTransactionSpeculatedOn(NULL),
105 firstPendingTransaction(NULL),
107 bufferResizeThreshold(0),
109 oldestLiveSlotSequenceNumver(1),
110 localMachineId(_localMachineId),
112 localSequenceNumber(0),
113 localTransactionSequenceNumber(0),
114 lastTransactionSequenceNumberSpeculatedOn(0),
115 oldestTransactionSequenceNumberSpeculatedOn(0),
116 localArbitrationSequenceNumber(0),
117 hadPartialSendToServer(false),
118 attemptedToSendToServer(false),
120 didFindTableStatus(false),
122 lastSlotAttemptedToSend(NULL),
125 lastTransactionPartsSent(NULL),
126 lastPendingSendArbitrationEntriesToDelete(NULL),
128 committedKeyValueTable(NULL),
129 speculatedKeyValueTable(NULL),
130 pendingTransactionSpeculatedKeyValueTable(NULL),
131 liveNewKeyTable(NULL),
132 lastMessageTable(NULL),
133 rejectedMessageWatchVectorTable(NULL),
134 arbitratorTable(NULL),
135 liveAbortTable(NULL),
136 newTransactionParts(NULL),
137 newCommitParts(NULL),
138 lastArbitratedTransactionNumberByArbitratorTable(NULL),
139 liveTransactionBySequenceNumberTable(NULL),
140 liveTransactionByTransactionIdTable(NULL),
141 liveCommitsTable(NULL),
142 liveCommitsByKeyTable(NULL),
143 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
144 rejectedSlotVector(NULL),
145 pendingTransactionQueue(NULL),
146 pendingSendArbitrationRounds(NULL),
147 pendingSendArbitrationEntriesToDelete(NULL),
148 transactionPartsSent(NULL),
149 outstandingTransactionStatus(NULL),
150 liveAbortsGeneratedByLocal(NULL),
151 offlineTransactionsCommittedAndAtServer(NULL),
152 localCommunicationTable(NULL),
153 lastTransactionSeenFromMachineFromServer(NULL),
154 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
155 lastInsertedNewKey(false),
166 delete committedKeyValueTable;
167 delete speculatedKeyValueTable;
168 delete pendingTransactionSpeculatedKeyValueTable;
169 delete liveNewKeyTable;
171 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
172 while (lmit->hasNext()) {
173 Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
176 delete lastMessageTable;
178 if (pendingTransactionBuilder != NULL)
179 delete pendingTransactionBuilder;
181 SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
182 while(rmit->hasNext()) {
183 int64_t machineid = rmit->next();
184 Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
185 SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
186 while (mit->hasNext()) {
187 RejectedMessage * rm = mit->next();
194 delete rejectedMessageWatchVectorTable;
196 delete arbitratorTable;
197 delete liveAbortTable;
199 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
200 while (partsit->hasNext()) {
201 int64_t machineId = partsit->next();
202 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
206 delete newTransactionParts;
209 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
210 while (partsit->hasNext()) {
211 int64_t machineId = partsit->next();
212 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
216 delete newCommitParts;
218 delete lastArbitratedTransactionNumberByArbitratorTable;
219 delete liveTransactionBySequenceNumberTable;
220 delete liveTransactionByTransactionIdTable;
222 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
223 while (liveit->hasNext()) {
224 int64_t arbitratorId = liveit->next();
226 // Get all the commits for a specific arbitrator
227 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
229 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
230 while (clientit->hasNext()) {
231 int64_t id = clientit->next();
232 delete commitForClientTable->get(id);
237 delete commitForClientTable;
240 delete liveCommitsTable;
242 delete liveCommitsByKeyTable;
243 delete lastCommitSeenSequenceNumberByArbitratorTable;
244 delete rejectedSlotVector;
246 uint size = pendingTransactionQueue->size();
247 for (uint iter = 0; iter < size; iter++) {
248 delete pendingTransactionQueue->get(iter);
250 delete pendingTransactionQueue;
252 delete pendingSendArbitrationEntriesToDelete;
253 delete transactionPartsSent;
254 delete outstandingTransactionStatus;
255 delete liveAbortsGeneratedByLocal;
256 delete offlineTransactionsCommittedAndAtServer;
257 delete localCommunicationTable;
258 delete lastTransactionSeenFromMachineFromServer;
260 for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
261 delete pendingSendArbitrationRounds->get(i);
263 delete pendingSendArbitrationRounds;
265 if (lastTransactionPartsSent != NULL)
266 delete lastTransactionPartsSent;
267 delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
271 * Init all the stuff needed for for table usage
274 // Init helper objects
275 random = new SecureRandom();
276 buffer = new SlotBuffer();
279 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
280 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
281 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
282 liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
283 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
284 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
285 arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
286 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
287 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
288 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
289 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
290 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
291 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
292 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
293 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
294 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
295 rejectedSlotVector = new Vector<int64_t>();
296 pendingTransactionQueue = new Vector<Transaction *>();
297 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
298 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
299 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
300 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
301 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
302 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
303 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
304 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
305 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
308 numberOfSlots = buffer->capacity();
309 setResizeThreshold();
313 * Initialize the table by inserting a table status as the first entry
314 * into the table status also initialize the crypto stuff.
316 void Table::initTable() {
317 cloud->initSecurity();
319 // Create the first insertion into the block chain which is the table status
320 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
321 localSequenceNumber++;
322 TableStatus *status = new TableStatus(s, numberOfSlots);
324 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
327 array = new Array<Slot *>(1);
329 // update local block chain
330 validateAndUpdate(array, true);
332 } else if (array->length() == 1) {
333 // in case we did push the slot BUT we failed to init it
334 validateAndUpdate(array, true);
338 throw new Error("Error on initialization");
343 * Rebuild the table from scratch by pulling the latest block chain
346 void Table::rebuild() {
347 // Just pull the latest slots from the server
348 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
349 validateAndUpdate(newslots, true);
352 updateLiveTransactionsAndStatus();
355 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
356 localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
359 int64_t Table::getArbitrator(IoTString *key) {
360 return arbitratorTable->get(key);
363 void Table::close() {
367 IoTString *Table::getCommitted(IoTString *key) {
368 KeyValue *kv = committedKeyValueTable->get(key);
371 return new IoTString(kv->getValue());
377 IoTString *Table::getSpeculative(IoTString *key) {
378 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
381 kv = speculatedKeyValueTable->get(key);
385 kv = committedKeyValueTable->get(key);
389 return new IoTString(kv->getValue());
395 IoTString *Table::getCommittedAtomic(IoTString *key) {
396 KeyValue *kv = committedKeyValueTable->get(key);
398 if (!arbitratorTable->contains(key)) {
399 throw new Error("Key not Found.");
402 // Make sure new key value pair matches the current arbitrator
403 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
404 // TODO: Maybe not throw en error
405 throw new Error("Not all Key Values Match Arbitrator.");
409 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
410 return new IoTString(kv->getValue());
412 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
417 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
418 if (!arbitratorTable->contains(key)) {
419 throw new Error("Key not Found.");
422 // Make sure new key value pair matches the current arbitrator
423 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
424 // TODO: Maybe not throw en error
425 throw new Error("Not all Key Values Match Arbitrator.");
428 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
431 kv = speculatedKeyValueTable->get(key);
435 kv = committedKeyValueTable->get(key);
439 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
440 return new IoTString(kv->getValue());
442 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
447 bool Table::update() {
449 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
450 validateAndUpdate(newSlots, false);
453 updateLiveTransactionsAndStatus();
455 } catch (Exception *e) {
456 SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
457 while (kit->hasNext()) {
458 int64_t m = kit->next();
467 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
469 if (arbitratorTable->contains(keyName)) {
470 // There is already an arbitrator
473 NewKey *newKey = new NewKey(NULL, keyName, machineId);
475 if (sendToServer(newKey)) {
476 // If successfully inserted
482 void Table::startTransaction() {
483 // Create a new transaction, invalidates any old pending transactions.
484 if (pendingTransactionBuilder != NULL)
485 delete pendingTransactionBuilder;
486 pendingTransactionBuilder = new PendingTransaction(localMachineId);
489 void Table::put(IoTString *key, IoTString *value) {
490 // Make sure it is a valid key
491 if (!arbitratorTable->contains(key)) {
492 throw new Error("Key not Found.");
495 // Make sure new key value pair matches the current arbitrator
496 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
497 // TODO: Maybe not throw en error
498 throw new Error("Not all Key Values Match Arbitrator.");
501 // Add the key value to this transaction
502 KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
503 pendingTransactionBuilder->addKV(kv);
506 TransactionStatus *Table::commitTransaction() {
507 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
508 // transaction with no updates will have no effect on the system
509 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
512 // Set the local transaction sequence number and increment
513 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
514 localTransactionSequenceNumber++;
516 // Create the transaction status
517 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
519 // Create the new transaction
520 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
521 newTransaction->setTransactionStatus(transactionStatus);
523 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
524 // Add it to the queue and invalidate the builder for safety
525 pendingTransactionQueue->add(newTransaction);
527 arbitrateOnLocalTransaction(newTransaction);
528 delete newTransaction;
529 updateLiveStateFromLocal();
531 if (pendingTransactionBuilder != NULL)
532 delete pendingTransactionBuilder;
534 pendingTransactionBuilder = new PendingTransaction(localMachineId);
538 } catch (ServerException *e) {
540 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
541 uint size = pendingTransactionQueue->size();
543 for (uint iter = 0; iter < size; iter++) {
544 Transaction *transaction = pendingTransactionQueue->get(iter);
545 pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
547 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
548 // Already contacted this client so ignore all attempts to contact this client
549 // to preserve ordering for arbitrator
553 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
555 if (sendReturn.getFirst()) {
556 // Failed to contact over local
557 arbitratorTriedAndFailed->add(transaction->getArbitrator());
559 // Successful contact or should not contact
561 if (sendReturn.getSecond()) {
568 pendingTransactionQueue->setSize(oldindex);
571 updateLiveStateFromLocal();
573 return transactionStatus;
577 * Recalculate the new resize threshold
579 void Table::setResizeThreshold() {
580 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
581 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
584 int64_t Table::getLocalSequenceNumber() {
585 return localSequenceNumber;
588 NewKey * Table::handlePartialSend(NewKey * newKey) {
589 //Didn't receive acknowledgement for last send
590 //See if the server has received a newer slot
592 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
593 if (newSlots->length() == 0) {
594 //Retry sending old slot
595 bool wasInserted = false;
596 bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
598 if (sendSlotsReturn) {
599 if (newKey != NULL) {
600 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
605 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
606 while (trit->hasNext()) {
607 Transaction *transaction = trit->next();
608 transaction->resetServerFailure();
609 // Update which transactions parts still need to be sent
610 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
611 // Add the transaction status to the outstanding list
612 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
614 // Update the transaction status
615 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
617 // Check if all the transaction parts were successfully
618 // sent and if so then remove it from pending
619 if (transaction->didSendAllParts()) {
620 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
621 pendingTransactionQueue->remove(transaction);
626 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
627 if (newKey != NULL) {
628 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
633 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
634 while (trit->hasNext()) {
635 Transaction *transaction = trit->next();
636 transaction->resetServerFailure();
638 // Update which transactions parts still need to be sent
639 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
641 // Add the transaction status to the outstanding list
642 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
644 // Update the transaction status
645 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
647 // Check if all the transaction parts were successfully sent and if so then remove it from pending
648 if (transaction->didSendAllParts()) {
649 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
650 pendingTransactionQueue->remove(transaction);
652 transaction->resetServerFailure();
653 // Set the transaction sequence number back to nothing
654 if (!transaction->didSendAPartToServer()) {
655 transaction->setSequenceNumber(-1);
663 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
664 while (trit->hasNext()) {
665 Transaction *transaction = trit->next();
666 transaction->resetServerFailure();
667 // Set the transaction sequence number back to nothing
668 if (!transaction->didSendAPartToServer()) {
669 transaction->setSequenceNumber(-1);
674 if (newSlots->length() != 0) {
675 // insert into the local block chain
676 validateAndUpdate(newSlots, true);
679 if (checkSend(newSlots, lastSlotAttemptedToSend)) {
680 if (newKey != NULL) {
681 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
686 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
687 while (trit->hasNext()) {
688 Transaction *transaction = trit->next();
689 transaction->resetServerFailure();
691 // Update which transactions parts still need to be sent
692 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
694 // Add the transaction status to the outstanding list
695 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
697 // Update the transaction status
698 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
700 // Check if all the transaction parts were successfully sent and if so then remove it from pending
701 if (transaction->didSendAllParts()) {
702 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
703 pendingTransactionQueue->remove(transaction);
705 transaction->resetServerFailure();
706 // Set the transaction sequence number back to nothing
707 if (!transaction->didSendAPartToServer()) {
708 transaction->setSequenceNumber(-1);
714 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
715 while (trit->hasNext()) {
716 Transaction *transaction = trit->next();
717 transaction->resetServerFailure();
718 // Set the transaction sequence number back to nothing
719 if (!transaction->didSendAPartToServer()) {
720 transaction->setSequenceNumber(-1);
726 // insert into the local block chain
727 validateAndUpdate(newSlots, true);
733 bool Table::sendToServer(NewKey *newKey) {
734 if (hadPartialSendToServer) {
735 newKey = handlePartialSend(newKey);
739 // While we have stuff that needs inserting into the block chain
740 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
741 if (hadPartialSendToServer) {
742 throw new Error("Should Be error free");
745 // If there is a new key with same name then end
746 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
751 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
752 localSequenceNumber++;
754 // Try to fill the slot with data
756 bool insertedNewKey = false;
757 bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
760 // Reset which transaction to send
761 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
762 while (trit->hasNext()) {
763 Transaction *transaction = trit->next();
764 transaction->resetNextPartToSend();
766 // Set the transaction sequence number back to nothing
767 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
768 transaction->setSequenceNumber(-1);
773 // Clear the sent data since we are trying again
774 pendingSendArbitrationEntriesToDelete->clear();
775 transactionPartsSent->clear();
777 // We needed a resize so try again
778 fillSlot(slot, true, newKey, newSize, insertedNewKey);
781 lastSlotAttemptedToSend = slot;
782 lastIsNewKey = (newKey != NULL);
783 lastInsertedNewKey = insertedNewKey;
784 lastNewSize = newSize;
786 if (lastTransactionPartsSent != NULL)
787 delete lastTransactionPartsSent;
788 lastTransactionPartsSent = transactionPartsSent->clone();
789 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
791 Array<Slot *> * newSlots = NULL;
792 bool wasInserted = false;
793 bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
795 if (sendSlotsReturn) {
796 // Did insert into the block chain
797 if (insertedNewKey) {
798 // This slot was what was inserted not a previous slot
799 // New Key was successfully inserted into the block chain so dont want to insert it again
803 // Remove the aborts and commit parts that were sent from the pending to send queue
804 uint size = pendingSendArbitrationRounds->size();
806 for (uint i = 0; i < size; i++) {
807 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
808 round->removeParts(pendingSendArbitrationEntriesToDelete);
810 if (!round->isDoneSending()) {
812 pendingSendArbitrationRounds->set(oldcount++,
813 pendingSendArbitrationRounds->get(i));
815 delete pendingSendArbitrationRounds->get(i);
817 pendingSendArbitrationRounds->setSize(oldcount);
819 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
820 while (trit->hasNext()) {
821 Transaction *transaction = trit->next();
822 transaction->resetServerFailure();
824 // Update which transactions parts still need to be sent
825 transaction->removeSentParts(transactionPartsSent->get(transaction));
827 // Add the transaction status to the outstanding list
828 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
830 // Update the transaction status
831 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
833 // Check if all the transaction parts were successfully sent and if so then remove it from pending
834 if (transaction->didSendAllParts()) {
835 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
836 pendingTransactionQueue->remove(transaction);
841 // Reset which transaction to send
842 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
843 while (trit->hasNext()) {
844 Transaction *transaction = trit->next();
845 transaction->resetNextPartToSend();
847 // Set the transaction sequence number back to nothing
848 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
849 transaction->setSequenceNumber(-1);
855 // Clear the sent data in preparation for next send
856 pendingSendArbitrationEntriesToDelete->clear();
857 transactionPartsSent->clear();
859 if (newSlots->length() != 0) {
860 // insert into the local block chain
861 validateAndUpdate(newSlots, true);
865 } catch (ServerException *e) {
866 if (e->getType() != ServerException_TypeInputTimeout) {
867 // Nothing was able to be sent to the server so just clear these data structures
868 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
869 while (trit->hasNext()) {
870 Transaction *transaction = trit->next();
871 transaction->resetNextPartToSend();
873 // Set the transaction sequence number back to nothing
874 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
875 transaction->setSequenceNumber(-1);
880 // There was a partial send to the server
881 hadPartialSendToServer = true;
883 // Nothing was able to be sent to the server so just clear these data structures
884 SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
885 while (trit->hasNext()) {
886 Transaction *transaction = trit->next();
887 transaction->resetNextPartToSend();
888 transaction->setServerFailure();
893 pendingSendArbitrationEntriesToDelete->clear();
894 transactionPartsSent->clear();
899 return newKey == NULL;
902 bool Table::updateFromLocal(int64_t machineId) {
903 if (!localCommunicationTable->contains(machineId))
906 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
908 // Get the size of the send data
909 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
911 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
912 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
913 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
916 Array<char> *sendData = new Array<char>(sendDataSize);
917 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
920 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
924 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
925 localSequenceNumber++;
927 if (returnData == NULL) {
928 // Could not contact server
933 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
934 int numberOfEntries = bbDecode->getInt();
936 for (int i = 0; i < numberOfEntries; i++) {
937 char type = bbDecode->get();
938 if (type == TypeAbort) {
939 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
941 } else if (type == TypeCommitPart) {
942 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
943 processEntry(commitPart);
947 updateLiveStateFromLocal();
952 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
954 // Get the devices local communications
955 if (!localCommunicationTable->contains(transaction->getArbitrator()))
956 return Pair<bool, bool>(true, false);
958 Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
960 // Get the size of the send data
961 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
963 Vector<TransactionPart *> *tParts = transaction->getParts();
964 uint tPartsSize = tParts->size();
965 for (uint i = 0; i < tPartsSize; i++) {
966 TransactionPart *part = tParts->get(i);
967 sendDataSize += part->getSize();
971 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
972 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
973 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
976 // Make the send data size
977 Array<char> *sendData = new Array<char>(sendDataSize);
978 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
981 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
982 bbEncode->putInt(transaction->getParts()->size());
984 Vector<TransactionPart *> *tParts = transaction->getParts();
985 uint tPartsSize = tParts->size();
986 for (uint i = 0; i < tPartsSize; i++) {
987 TransactionPart *part = tParts->get(i);
988 part->encode(bbEncode);
993 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
994 localSequenceNumber++;
996 if (returnData == NULL) {
997 // Could not contact server
998 return Pair<bool, bool>(true, false);
1002 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
1003 bool didCommit = bbDecode->get() == 1;
1004 bool couldArbitrate = bbDecode->get() == 1;
1005 int numberOfEntries = bbDecode->getInt();
1006 bool foundAbort = false;
1008 for (int i = 0; i < numberOfEntries; i++) {
1009 char type = bbDecode->get();
1010 if (type == TypeAbort) {
1011 Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
1013 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
1017 processEntry(abort);
1018 } else if (type == TypeCommitPart) {
1019 CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
1020 processEntry(commitPart);
1024 updateLiveStateFromLocal();
1026 if (couldArbitrate) {
1027 TransactionStatus *status = transaction->getTransactionStatus();
1029 status->setStatus(TransactionStatus_StatusCommitted);
1031 status->setStatus(TransactionStatus_StatusAborted);
1034 TransactionStatus *status = transaction->getTransactionStatus();
1036 status->setStatus(TransactionStatus_StatusAborted);
1038 status->setStatus(TransactionStatus_StatusCommitted);
1042 return Pair<bool, bool>(false, true);
1045 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
1047 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
1048 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
1049 int numberOfParts = bbDecode->getInt();
1051 // If we did commit a transaction or not
1052 bool didCommit = false;
1053 bool couldArbitrate = false;
1055 if (numberOfParts != 0) {
1057 // decode the transaction
1058 Transaction *transaction = new Transaction();
1059 for (int i = 0; i < numberOfParts; i++) {
1061 TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
1062 transaction->addPartDecode(newPart);
1065 // Arbitrate on transaction and pull relevant return data
1066 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
1067 couldArbitrate = localArbitrateReturn.getFirst();
1068 didCommit = localArbitrateReturn.getSecond();
1070 updateLiveStateFromLocal();
1072 // Transaction was sent to the server so keep track of it to prevent double commit
1073 if (transaction->getSequenceNumber() != -1) {
1074 offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
1078 // The data to send back
1079 int returnDataSize = 0;
1080 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
1082 // Get the aborts to send back
1083 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
1085 SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
1086 while (abortit->hasNext())
1087 abortLocalSequenceNumbers->add(abortit->next());
1091 qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1093 uint asize = abortLocalSequenceNumbers->size();
1094 for (uint i = 0; i < asize; i++) {
1095 int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
1096 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1100 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
1101 unseenArbitrations->add(abort);
1102 returnDataSize += abort->getSize();
1105 // Get the commits to send back
1106 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
1107 if (commitForClientTable != NULL) {
1108 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
1110 SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
1111 while (commitit->hasNext())
1112 commitLocalSequenceNumbers->add(commitit->next());
1115 qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1117 uint clsSize = commitLocalSequenceNumbers->size();
1118 for (uint clsi = 0; clsi < clsSize; clsi++) {
1119 int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
1120 Commit *commit = commitForClientTable->get(localSequenceNumber);
1122 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
1127 Vector<CommitPart *> *parts = commit->getParts();
1128 uint nParts = parts->size();
1129 for (uint i = 0; i < nParts; i++) {
1130 CommitPart *commitPart = parts->get(i);
1131 unseenArbitrations->add(commitPart);
1132 returnDataSize += commitPart->getSize();
1138 // Number of arbitration entries to decode
1139 returnDataSize += 2 * sizeof(int32_t);
1141 // bool of did commit or not
1142 if (numberOfParts != 0) {
1143 returnDataSize += sizeof(char);
1146 // Data to send Back
1147 Array<char> *returnData = new Array<char>(returnDataSize);
1148 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1150 if (numberOfParts != 0) {
1152 bbEncode->put((char)1);
1154 bbEncode->put((char)0);
1156 if (couldArbitrate) {
1157 bbEncode->put((char)1);
1159 bbEncode->put((char)0);
1163 bbEncode->putInt(unseenArbitrations->size());
1164 uint size = unseenArbitrations->size();
1165 for (uint i = 0; i < size; i++) {
1166 Entry *entry = unseenArbitrations->get(i);
1167 entry->encode(bbEncode);
1170 localSequenceNumber++;
1174 /** Checks whether a given slot was sent using new slots in
1175 array. Returns true if sent and false otherwise. */
1177 bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
1178 uint size = array->length();
1179 for (uint i = 0; i < size; i++) {
1180 Slot *s = array->get(i);
1181 if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1186 //Also need to see if other machines acknowledged our message
1187 for (uint i = 0; i < size; i++) {
1188 Slot *s = array->get(i);
1190 // Process each entry in the slot
1191 Vector<Entry *> *entries = s->getEntries();
1192 uint eSize = entries->size();
1193 for (uint ei = 0; ei < eSize; ei++) {
1194 Entry *entry = entries->get(ei);
1196 if (entry->getType() == TypeLastMessage) {
1197 LastMessage *lastMessage = (LastMessage *)entry;
1199 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
1209 /** Method tries to send slot to server. Returns status in tuple.
1210 isInserted returns whether last un-acked send (if any) was
1211 successful. Returns whether send was confirmed.x
1214 bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
1215 attemptedToSendToServer = true;
1217 *array = cloud->putSlot(slot, newSize);
1218 if (*array == NULL) {
1219 *array = new Array<Slot *>(1);
1220 (*array)->set(0, slot);
1221 rejectedSlotVector->clear();
1222 *isInserted = false;
1225 if ((*array)->length() == 0) {
1226 throw new Error("Server Error: Did not send any slots");
1229 if (hadPartialSendToServer) {
1230 *isInserted = checkSend(*array, slot);
1232 if (!(*isInserted)) {
1233 rejectedSlotVector->add(slot->getSequenceNumber());
1238 rejectedSlotVector->add(slot->getSequenceNumber());
1239 *isInserted = false;
1246 * Returns true if a resize was needed but not done.
1248 bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
1249 newSize = 0;//special value to indicate no resize
1250 if (liveSlotCount > bufferResizeThreshold) {
1251 resize = true;//Resize is forced
1255 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1256 TableStatus *status = new TableStatus(slot, newSize);
1257 slot->addEntry(status);
1260 // Fill with rejected slots first before doing anything else
1261 doRejectedMessages(slot);
1263 // Do mandatory rescue of entries
1264 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1266 // Extract working variables
1267 bool needsResize = mandatoryRescueReturn.getFirst();
1268 bool seenLiveSlot = mandatoryRescueReturn.getSecond();
1269 int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
1271 if (needsResize && !resize) {
1272 // We need to resize but we are not resizing so return true to force on retry
1276 insertedKey = false;
1277 if (newKeyEntry != NULL) {
1278 newKeyEntry->setSlot(slot);
1279 if (slot->hasSpace(newKeyEntry)) {
1280 slot->addEntry(newKeyEntry);
1285 // Clear the transactions, aborts and commits that were sent previously
1286 transactionPartsSent->clear();
1287 pendingSendArbitrationEntriesToDelete->clear();
1288 uint size = pendingSendArbitrationRounds->size();
1289 for (uint i = 0; i < size; i++) {
1290 ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
1291 bool isFull = false;
1292 round->generateParts();
1293 Vector<Entry *> *parts = round->getParts();
1295 // Insert pending arbitration data
1296 uint vsize = parts->size();
1297 for (uint vi = 0; vi < vsize; vi++) {
1298 Entry *arbitrationData = parts->get(vi);
1300 // If it is an abort then we need to set some information
1301 if (arbitrationData->getType() == TypeAbort) {
1302 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1305 if (!slot->hasSpace(arbitrationData)) {
1306 // No space so cant do anything else with these data entries
1311 // Add to this current slot and add it to entries to delete
1312 slot->addEntry(arbitrationData);
1313 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1321 if (pendingTransactionQueue->size() > 0) {
1322 Transaction *transaction = pendingTransactionQueue->get(0);
1323 // Set the transaction sequence number if it has yet to be inserted into the block chain
1324 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1325 transaction->setSequenceNumber(slot->getSequenceNumber());
1329 TransactionPart *part = transaction->getNextPartToSend();
1331 // Ran out of parts to send for this transaction so move on
1335 if (slot->hasSpace(part)) {
1336 slot->addEntry(part);
1337 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1338 if (partsSent == NULL) {
1339 partsSent = new Vector<int32_t>();
1340 transactionPartsSent->put(transaction, partsSent);
1342 partsSent->add(part->getPartNumber());
1343 transactionPartsSent->put(transaction, partsSent);
1350 // Fill the remainder of the slot with rescue data
1351 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1356 void Table::doRejectedMessages(Slot *s) {
1357 if (!rejectedSlotVector->isEmpty()) {
1358 /* TODO: We should avoid generating a rejected message entry if
1359 * there is already a sufficient entry in the queue (e->g->,
1360 * equalsto value of true and same sequence number)-> */
1362 int64_t old_seqn = rejectedSlotVector->get(0);
1363 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1364 int64_t new_seqn = rejectedSlotVector->lastElement();
1365 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1368 int64_t prev_seqn = -1;
1370 /* Go through list of missing messages */
1371 for (; i < rejectedSlotVector->size(); i++) {
1372 int64_t curr_seqn = rejectedSlotVector->get(i);
1373 Slot *s_msg = buffer->getSlot(curr_seqn);
1376 prev_seqn = curr_seqn;
1378 /* Generate rejected message entry for missing messages */
1379 if (prev_seqn != -1) {
1380 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1383 /* Generate rejected message entries for present messages */
1384 for (; i < rejectedSlotVector->size(); i++) {
1385 int64_t curr_seqn = rejectedSlotVector->get(i);
1386 Slot *s_msg = buffer->getSlot(curr_seqn);
1387 int64_t machineid = s_msg->getMachineID();
1388 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1395 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1396 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1397 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1398 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1399 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1402 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1403 bool seenLiveSlot = false;
1404 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1405 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1409 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1410 Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
1411 // Push slot number forward
1412 if (!seenLiveSlot) {
1413 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1416 if (!previousSlot->isLive()) {
1420 // We have seen a live slot
1421 seenLiveSlot = true;
1423 // Get all the live entries for a slot
1424 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1426 // Iterate over all the live entries and try to rescue them
1427 uint lESize = liveEntries->size();
1428 for (uint i = 0; i < lESize; i++) {
1429 Entry *liveEntry = liveEntries->get(i);
1430 if (slot->hasSpace(liveEntry)) {
1431 // Enough space to rescue the entry
1432 slot->addEntry(liveEntry);
1433 } else if (currentSequenceNumber == firstIfFull) {
1434 //if there's no space but the entry is about to fall off the queue
1435 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1441 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1444 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1445 /* now go through live entries from least to greatest sequence number until
1446 * either all live slots added, or the slot doesn't have enough room
1447 * for SKIP_THRESHOLD consecutive entries*/
1449 int64_t newestseqnum = buffer->getNewestSeqNum();
1450 for (; seqn <= newestseqnum; seqn++) {
1451 Slot *prevslot = buffer->getSlot(seqn);
1452 //Push slot number forward
1454 oldestLiveSlotSequenceNumver = seqn;
1456 if (!prevslot->isLive())
1458 seenliveslot = true;
1459 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1460 uint lESize = liveentries->size();
1461 for (uint i = 0; i < lESize; i++) {
1462 Entry *liveentry = liveentries->get(i);
1463 if (s->hasSpace(liveentry))
1464 s->addEntry(liveentry);
1467 if (skipcount > Table_SKIP_THRESHOLD) {
1480 * Checks for malicious activity and updates the local copy of the block chain->
1482 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1483 // The cloud communication layer has checked slot HMACs already
1485 if (newSlots->length() == 0) {
1489 // Make sure all slots are newer than the last largest slot this
1491 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1492 if (firstSeqNum <= sequenceNumber) {
1493 throw new Error("Server Error: Sent older slots!");
1496 // Create an object that can access both new slots and slots in our
1497 // local chain without committing slots to our local chain
1498 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1500 // Check that the HMAC chain is not broken
1501 checkHMACChain(indexer, newSlots);
1503 // Set to keep track of messages from clients
1504 Hashset<int64_t> *machineSet = new Hashset<int64_t>();
1506 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
1507 while (lmit->hasNext())
1508 machineSet->add(lmit->next());
1512 // Process each slots data
1514 uint numSlots = newSlots->length();
1515 for (uint i = 0; i < numSlots; i++) {
1516 Slot *slot = newSlots->get(i);
1517 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1518 updateExpectedSize();
1523 // If there is a gap, check to see if the server sent us
1525 if (firstSeqNum != (sequenceNumber + 1)) {
1527 // Check the size of the slots that were sent down by the server->
1528 // Can only check the size if there was a gap
1529 checkNumSlots(newSlots->length());
1531 // Since there was a gap every machine must have pushed a slot or
1532 // must have a last message message-> If not then the server is
1534 if (!machineSet->isEmpty()) {
1536 throw new Error("Missing record for machines: ");
1540 // Update the size of our local block chain->
1543 // Commit new to slots to the local block chain->
1545 uint numSlots = newSlots->length();
1546 for (uint i = 0; i < numSlots; i++) {
1547 Slot *slot = newSlots->get(i);
1549 // Insert this slot into our local block chain copy->
1550 buffer->putSlot(slot);
1552 // Keep track of how many slots are currently live (have live data
1557 // Get the sequence number of the latest slot in the system
1558 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1559 updateLiveStateFromServer();
1561 // No Need to remember after we pulled from the server
1562 offlineTransactionsCommittedAndAtServer->clear();
1564 // This is invalidated now
1565 hadPartialSendToServer = false;
1568 void Table::updateLiveStateFromServer() {
1569 // Process the new transaction parts
1570 processNewTransactionParts();
1572 // Do arbitration on new transactions that were received
1573 arbitrateFromServer();
1575 // Update all the committed keys
1576 bool didCommitOrSpeculate = updateCommittedTable();
1578 // Delete the transactions that are now dead
1579 updateLiveTransactionsAndStatus();
1582 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1583 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1586 void Table::updateLiveStateFromLocal() {
1587 // Update all the committed keys
1588 bool didCommitOrSpeculate = updateCommittedTable();
1590 // Delete the transactions that are now dead
1591 updateLiveTransactionsAndStatus();
1594 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1595 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1598 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1599 int64_t prevslots = firstSequenceNumber;
1601 if (didFindTableStatus) {
1603 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1606 didFindTableStatus = true;
1607 currMaxSize = numberOfSlots;
1610 void Table::updateExpectedSize() {
1613 if (expectedsize > currMaxSize) {
1614 expectedsize = currMaxSize;
1620 * Check the size of the block chain to make sure there are enough
1621 * slots sent back by the server-> This is only called when we have a
1622 * gap between the slots that we have locally and the slots sent by
1623 * the server therefore in the slots sent by the server there will be
1624 * at least 1 Table status message
1626 void Table::checkNumSlots(int numberOfSlots) {
1627 if (numberOfSlots != expectedsize) {
1628 throw new Error("Server Error: Server did not send all slots-> Expected: ");
1633 * Update the size of of the local buffer if it is needed->
1635 void Table::commitNewMaxSize() {
1636 didFindTableStatus = false;
1638 // Resize the local slot buffer
1639 if (numberOfSlots != currMaxSize) {
1640 buffer->resize((int32_t)currMaxSize);
1643 // Change the number of local slots to the new size
1644 numberOfSlots = (int32_t)currMaxSize;
1646 // Recalculate the resize threshold since the size of the local
1647 // buffer has changed
1648 setResizeThreshold();
1652 * Process the new transaction parts from this latest round of slots
1653 * received from the server
1655 void Table::processNewTransactionParts() {
1657 if (newTransactionParts->size() == 0) {
1658 // Nothing new to process
1662 // Iterate through all the machine Ids that we received new parts
1664 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
1665 while (tpit->hasNext()) {
1666 int64_t machineId = tpit->next();
1667 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1669 SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
1670 // Iterate through all the parts for that machine Id
1671 while (ptit->hasNext()) {
1672 Pair<int64_t, int32_t> *partId = ptit->next();
1673 TransactionPart *part = parts->get(partId);
1675 if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
1676 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1677 if (lastTransactionNumber >= part->getSequenceNumber()) {
1678 // Set dead the transaction part
1684 // Get the transaction object for that sequence number
1685 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1687 if (transaction == NULL) {
1688 // This is a new transaction that we dont have so make a new one
1689 transaction = new Transaction();
1691 // Add that part to the transaction
1692 transaction->addPartDecode(part);
1694 // Insert this new transaction into the live tables
1695 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1696 liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
1702 // Clear all the new transaction parts in preparation for the next
1703 // time the server sends slots
1705 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
1706 while (partsit->hasNext()) {
1707 int64_t machineId = partsit->next();
1708 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
1712 newTransactionParts->clear();
1716 void Table::arbitrateFromServer() {
1717 if (liveTransactionBySequenceNumberTable->size() == 0) {
1718 // Nothing to arbitrate on so move on
1722 // Get the transaction sequence numbers and sort from oldest to newest
1723 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
1725 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
1726 while (trit->hasNext())
1727 transactionSequenceNumbers->add(trit->next());
1730 qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
1732 // Collection of key value pairs that are
1733 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
1735 // The last transaction arbitrated on
1736 int64_t lastTransactionCommitted = -1;
1737 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1738 uint tsnSize = transactionSequenceNumbers->size();
1739 for (uint i = 0; i < tsnSize; i++) {
1740 int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
1741 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1743 // Check if this machine arbitrates for this transaction if not
1744 // then we cant arbitrate this transaction
1745 if (transaction->getArbitrator() != localMachineId) {
1749 if (transactionSequenceNumber < lastSeqNumArbOn) {
1753 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1754 // We have seen this already locally so dont commit again
1758 if (!transaction->isComplete()) {
1759 // Will arbitrate in incorrect order if we continue so just break
1764 // update the largest transaction seen by arbitrator from server
1765 if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1766 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1768 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1769 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1770 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1774 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1775 // Guard evaluated as true
1776 // Update the local changes so we can make the commit
1777 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1778 while (kvit->hasNext()) {
1779 KeyValue *kv = kvit->next();
1780 speculativeTableTmp->put(kv->getKey(), kv);
1784 // Update what the last transaction committed was for use in batch commit
1785 lastTransactionCommitted = transactionSequenceNumber;
1787 // Guard evaluated was false so create abort
1789 Abort *newAbort = new Abort(NULL,
1790 transaction->getClientLocalSequenceNumber(),
1791 transaction->getSequenceNumber(),
1792 transaction->getMachineId(),
1793 transaction->getArbitrator(),
1794 localArbitrationSequenceNumber);
1795 localArbitrationSequenceNumber++;
1796 generatedAborts->add(newAbort);
1798 // Insert the abort so we can process
1799 processEntry(newAbort);
1802 lastSeqNumArbOn = transactionSequenceNumber;
1805 Commit *newCommit = NULL;
1807 // If there is something to commit
1808 if (speculativeTableTmp->size() != 0) {
1809 // Create the commit and increment the commit sequence number
1810 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1811 localArbitrationSequenceNumber++;
1813 // Add all the new keys to the commit
1814 SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
1815 while (spit->hasNext()) {
1816 IoTString *string = spit->next();
1817 KeyValue *kv = speculativeTableTmp->get(string);
1818 newCommit->addKV(kv);
1822 // create the commit parts
1823 newCommit->createCommitParts();
1825 // Append all the commit parts to the end of the pending queue
1826 // waiting for sending to the server
1827 // Insert the commit so we can process it
1828 Vector<CommitPart *> *parts = newCommit->getParts();
1829 uint partsSize = parts->size();
1830 for (uint i = 0; i < partsSize; i++) {
1831 CommitPart *commitPart = parts->get(i);
1832 processEntry(commitPart);
1835 delete speculativeTableTmp;
1837 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1838 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1839 pendingSendArbitrationRounds->add(arbitrationRound);
1841 if (compactArbitrationData()) {
1842 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1843 if (newArbitrationRound->getCommit() != NULL) {
1844 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1845 uint partsSize = parts->size();
1846 for (uint i = 0; i < partsSize; i++) {
1847 CommitPart *commitPart = parts->get(i);
1848 processEntry(commitPart);
1855 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1857 // Check if this machine arbitrates for this transaction if not then
1858 // we cant arbitrate this transaction
1859 if (transaction->getArbitrator() != localMachineId) {
1860 return Pair<bool, bool>(false, false);
1863 if (!transaction->isComplete()) {
1864 // Will arbitrate in incorrect order if we continue so just break
1866 return Pair<bool, bool>(false, false);
1869 if (transaction->getMachineId() != localMachineId) {
1870 // dont do this check for local transactions
1871 if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
1872 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1873 // We've have already seen this from the server
1874 return Pair<bool, bool>(false, false);
1879 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1880 // Guard evaluated as true Create the commit and increment the
1881 // commit sequence number
1882 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1883 localArbitrationSequenceNumber++;
1885 // Update the local changes so we can make the commit
1886 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
1887 while (kvit->hasNext()) {
1888 KeyValue *kv = kvit->next();
1889 newCommit->addKV(kv);
1893 // create the commit parts
1894 newCommit->createCommitParts();
1896 // Append all the commit parts to the end of the pending queue
1897 // waiting for sending to the server
1898 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1899 pendingSendArbitrationRounds->add(arbitrationRound);
1901 if (compactArbitrationData()) {
1902 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1903 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1904 uint partsSize = parts->size();
1905 for (uint i = 0; i < partsSize; i++) {
1906 CommitPart *commitPart = parts->get(i);
1907 processEntry(commitPart);
1910 // Insert the commit so we can process it
1911 Vector<CommitPart *> *parts = newCommit->getParts();
1912 uint partsSize = parts->size();
1913 for (uint i = 0; i < partsSize; i++) {
1914 CommitPart *commitPart = parts->get(i);
1915 processEntry(commitPart);
1919 if (transaction->getMachineId() == localMachineId) {
1920 TransactionStatus *status = transaction->getTransactionStatus();
1921 if (status != NULL) {
1922 status->setStatus(TransactionStatus_StatusCommitted);
1926 updateLiveStateFromLocal();
1927 return Pair<bool, bool>(true, true);
1929 if (transaction->getMachineId() == localMachineId) {
1930 // For locally created messages update the status
1931 // Guard evaluated was false so create abort
1932 TransactionStatus *status = transaction->getTransactionStatus();
1933 if (status != NULL) {
1934 status->setStatus(TransactionStatus_StatusAborted);
1937 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1940 Abort *newAbort = new Abort(NULL,
1941 transaction->getClientLocalSequenceNumber(),
1943 transaction->getMachineId(),
1944 transaction->getArbitrator(),
1945 localArbitrationSequenceNumber);
1946 localArbitrationSequenceNumber++;
1947 addAbortSet->add(newAbort);
1949 // Append all the commit parts to the end of the pending queue
1950 // waiting for sending to the server
1951 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1952 pendingSendArbitrationRounds->add(arbitrationRound);
1954 if (compactArbitrationData()) {
1955 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1957 Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
1958 uint partsSize = parts->size();
1959 for (uint i = 0; i < partsSize; i++) {
1960 CommitPart *commitPart = parts->get(i);
1961 processEntry(commitPart);
1966 updateLiveStateFromLocal();
1967 return Pair<bool, bool>(true, false);
1972 * Compacts the arbitration data by merging commits and aggregating
1973 * aborts so that a single large push of commits can be done instead
1974 * of many small updates
1976 bool Table::compactArbitrationData() {
1977 if (pendingSendArbitrationRounds->size() < 2) {
1978 // Nothing to compact so do nothing
1982 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1983 if (lastRound->getDidSendPart()) {
1987 bool hadCommit = (lastRound->getCommit() == NULL);
1988 bool gotNewCommit = false;
1990 uint numberToDelete = 1;
1991 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1992 ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1994 if (round->isFull() || round->getDidSendPart()) {
1995 // Stop since there is a part that cannot be compacted and we
1996 // need to compact in order
2000 if (round->getCommit() == NULL) {
2001 // Try compacting aborts only
2002 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
2003 if (newSize > ArbitrationRound_MAX_PARTS) {
2004 // Cant compact since it would be too large
2007 lastRound->addAborts(round->getAborts());
2009 // Create a new larger commit
2010 Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
2011 localArbitrationSequenceNumber++;
2013 // Create the commit parts so that we can count them
2014 newCommit->createCommitParts();
2016 // Calculate the new size of the parts
2017 int newSize = newCommit->getNumberOfParts();
2018 newSize += lastRound->getAbortsCount();
2019 newSize += round->getAbortsCount();
2021 if (newSize > ArbitrationRound_MAX_PARTS) {
2022 // Cant compact since it would be too large
2026 // Set the new compacted part
2027 if (lastRound->getCommit() == newCommit)
2028 lastRound->setCommit(NULL);
2029 if (round->getCommit() == newCommit)
2030 round->setCommit(NULL);
2032 if (lastRound->getCommit() != NULL) {
2033 Commit * oldcommit = lastRound->getCommit();
2034 lastRound->setCommit(NULL);
2037 lastRound->setCommit(newCommit);
2038 lastRound->addAborts(round->getAborts());
2039 gotNewCommit = true;
2045 if (numberToDelete != 1) {
2046 // If there is a compaction
2047 // Delete the previous pieces that are now in the new compacted piece
2048 for (uint i = 2; i <= numberToDelete; i++) {
2049 delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
2051 pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
2053 pendingSendArbitrationRounds->add(lastRound);
2055 // Should reinsert into the commit processor
2056 if (hadCommit && gotNewCommit) {
2065 * Update all the commits and the committed tables, sets dead the dead
2068 bool Table::updateCommittedTable() {
2069 if (newCommitParts->size() == 0) {
2070 // Nothing new to process
2074 // Iterate through all the machine Ids that we received new parts for
2075 SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
2076 while (partsit->hasNext()) {
2077 int64_t machineId = partsit->next();
2078 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
2080 // Iterate through all the parts for that machine Id
2081 SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
2082 while (pairit->hasNext()) {
2083 Pair<int64_t, int32_t> *partId = pairit->next();
2084 CommitPart *part = parts->get(partId);
2086 // Get the transaction object for that sequence number
2087 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
2089 if (commitForClientTable == NULL) {
2090 // This is the first commit from this device
2091 commitForClientTable = new Hashtable<int64_t, Commit *>();
2092 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
2095 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
2097 if (commit == NULL) {
2098 // This is a new commit that we dont have so make a new one
2099 commit = new Commit();
2101 // Insert this new commit into the live tables
2102 commitForClientTable->put(part->getSequenceNumber(), commit);
2105 // Add that part to the commit
2106 commit->addPartDecode(part);
2113 // Clear all the new commits parts in preparation for the next time
2114 // the server sends slots
2115 newCommitParts->clear();
2117 // If we process a new commit keep track of it for future use
2118 bool didProcessANewCommit = false;
2120 // Process the commits one by one
2121 SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
2122 while (liveit->hasNext()) {
2123 int64_t arbitratorId = liveit->next();
2125 // Get all the commits for a specific arbitrator
2126 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
2128 // Sort the commits in order
2129 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
2131 SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
2132 while (clientit->hasNext())
2133 commitSequenceNumbers->add(clientit->next());
2137 qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
2139 // Get the last commit seen from this arbitrator
2140 int64_t lastCommitSeenSequenceNumber = -1;
2141 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
2142 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
2145 // Go through each new commit one by one
2146 for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
2147 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
2148 Commit *commit = commitForClientTable->get(commitSequenceNumber);
2150 // Special processing if a commit is not complete
2151 if (!commit->isComplete()) {
2152 if (i == (commitSequenceNumbers->size() - 1)) {
2153 // If there is an incomplete commit and this commit is the
2154 // latest one seen then this commit cannot be processed and
2155 // there are no other commits
2158 // This is a commit that was already dead but parts of it
2159 // are still in the block chain (not flushed out yet)->
2160 // Delete it and move on
2162 commitForClientTable->remove(commit->getSequenceNumber());
2168 // Update the last transaction that was updated if we can
2169 if (commit->getTransactionSequenceNumber() != -1) {
2170 // Update the last transaction sequence number that the arbitrator arbitrated on1
2171 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2172 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2176 // Update the last arbitration data that we have seen so far
2177 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
2178 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
2179 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
2181 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2184 // Never seen any data from this arbitrator so record the first one
2185 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
2188 // We have already seen this commit before so need to do the
2189 // full processing on this commit
2190 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
2191 // Update the last transaction that was updated if we can
2192 if (commit->getTransactionSequenceNumber() != -1) {
2193 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
2194 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
2195 lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
2196 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2202 // If we got here then this is a brand new commit and needs full
2204 // Get what commits should be edited, these are the commits that
2205 // have live values for their keys
2206 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2208 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2209 while (kvit->hasNext()) {
2210 KeyValue *kv = kvit->next();
2211 Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
2213 commitsToEdit->add(commit);
2218 // Update each previous commit that needs to be updated
2219 SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
2220 while (commitit->hasNext()) {
2221 Commit *previousCommit = commitit->next();
2223 // Only bother with live commits (TODO: Maybe remove this check)
2224 if (previousCommit->isLive()) {
2226 // Update which keys in the old commits are still live
2228 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2229 while (kvit->hasNext()) {
2230 KeyValue *kv = kvit->next();
2231 previousCommit->invalidateKey(kv->getKey());
2236 // if the commit is now dead then remove it
2237 if (!previousCommit->isLive()) {
2238 commitForClientTable->remove(previousCommit->getSequenceNumber());
2239 delete previousCommit;
2244 delete commitsToEdit;
2246 // Update the last seen sequence number from this arbitrator
2247 if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
2248 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2249 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2252 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2255 // We processed a new commit that we havent seen before
2256 didProcessANewCommit = true;
2258 // Update the committed table of keys and which commit is using which key
2260 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
2261 while (kvit->hasNext()) {
2262 KeyValue *kv = kvit->next();
2263 printf("Commited KeyValue Table update for %p\n", this);
2264 kv->getKey()->print();
2266 kv->getValue()->print();
2268 committedKeyValueTable->put(kv->getKey(), kv);
2269 liveCommitsByKeyTable->put(kv->getKey(), commit);
2277 return didProcessANewCommit;
2281 * Create the speculative table from transactions that are still live
2282 * and have come from the cloud
2284 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2285 if (liveTransactionBySequenceNumberTable->size() == 0) {
2286 // There is nothing to speculate on
2290 // Create a list of the transaction sequence numbers and sort them
2291 // from oldest to newest
2292 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
2294 SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
2295 while (trit->hasNext())
2296 transactionSequenceNumbersSorted->add(trit->next());
2300 qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
2302 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2305 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2306 // If there is a gap in the transaction sequence numbers then
2307 // there was a commit or an abort of a transaction OR there was a
2308 // new commit (Could be from offline commit) so a redo the
2309 // speculation from scratch
2311 // Start from scratch
2312 speculatedKeyValueTable->clear();
2313 lastTransactionSequenceNumberSpeculatedOn = -1;
2314 oldestTransactionSequenceNumberSpeculatedOn = -1;
2317 // Remember the front of the transaction list
2318 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2320 // Find where to start arbitration from
2321 uint startIndex = 0;
2323 for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
2324 if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
2328 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2329 // Make sure we are not out of bounds
2330 return false; // did not speculate
2333 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2334 bool didSkip = true;
2336 for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2337 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2338 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2340 if (!transaction->isComplete()) {
2341 // If there is an incomplete transaction then there is nothing
2342 // we can do add this transactions arbitrator to the list of
2343 // arbitrators we should ignore
2344 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2349 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2353 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2355 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2356 // Guard evaluated to true so update the speculative table
2358 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2359 while (kvit->hasNext()) {
2360 KeyValue *kv = kvit->next();
2361 speculatedKeyValueTable->put(kv->getKey(), kv);
2369 // Since there was a skip we need to redo the speculation next time around
2370 lastTransactionSequenceNumberSpeculatedOn = -1;
2371 oldestTransactionSequenceNumberSpeculatedOn = -1;
2374 // We did some speculation
2379 * Create the pending transaction speculative table from transactions
2380 * that are still in the pending transaction buffer
2382 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2383 if (pendingTransactionQueue->size() == 0) {
2384 // There is nothing to speculate on
2388 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2389 // need to reset on the pending speculation
2390 lastPendingTransactionSpeculatedOn = NULL;
2391 firstPendingTransaction = pendingTransactionQueue->get(0);
2392 pendingTransactionSpeculatedKeyValueTable->clear();
2395 // Find where to start arbitration from
2396 uint startIndex = 0;
2398 for (; startIndex < pendingTransactionQueue->size(); startIndex++)
2399 if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
2402 if (startIndex >= pendingTransactionQueue->size()) {
2403 // Make sure we are not out of bounds
2407 for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
2408 Transaction *transaction = pendingTransactionQueue->get(i);
2410 lastPendingTransactionSpeculatedOn = transaction;
2412 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2413 // Guard evaluated to true so update the speculative table
2414 SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
2415 while (kvit->hasNext()) {
2416 KeyValue *kv = kvit->next();
2417 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2425 * Set dead and remove from the live transaction tables the
2426 * transactions that are dead
2428 void Table::updateLiveTransactionsAndStatus() {
2429 // Go through each of the transactions
2431 SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
2432 while (iter->hasNext()) {
2433 int64_t key = iter->next();
2434 Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
2436 // Check if the transaction is dead
2437 if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
2438 && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
2439 // Set dead the transaction
2440 transaction->setDead();
2442 // Remove the transaction from the live table
2444 liveTransactionByTransactionIdTable->remove(transaction->getId());
2451 // Go through each of the transactions
2453 SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
2454 while (iter->hasNext()) {
2455 int64_t key = iter->next();
2456 TransactionStatus *status = outstandingTransactionStatus->get(key);
2458 // Check if the transaction is dead
2459 if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
2460 && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
2462 status->setStatus(TransactionStatus_StatusCommitted);
2473 * Process this slot, entry by entry-> Also update the latest message sent by slot
2475 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2477 // Update the last message seen
2478 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2480 // Process each entry in the slot
2481 Vector<Entry *> *entries = slot->getEntries();
2482 uint eSize = entries->size();
2483 for (uint ei = 0; ei < eSize; ei++) {
2484 Entry *entry = entries->get(ei);
2485 switch (entry->getType()) {
2486 case TypeCommitPart:
2487 processEntry((CommitPart *)entry);
2490 processEntry((Abort *)entry);
2492 case TypeTransactionPart:
2493 processEntry((TransactionPart *)entry);
2496 processEntry((NewKey *)entry);
2498 case TypeLastMessage:
2499 processEntry((LastMessage *)entry, machineSet);
2501 case TypeRejectedMessage:
2502 processEntry((RejectedMessage *)entry, indexer);
2504 case TypeTableStatus:
2505 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2508 throw new Error("Unrecognized type: ");
2514 * Update the last message that was sent for a machine Id
2516 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2517 // Update what the last message received by a machine was
2518 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2522 * Add the new key to the arbitrators table and update the set of live
2523 * new keys (in case of a rescued new key message)
2525 void Table::processEntry(NewKey *entry) {
2526 // Update the arbitrator table with the new key information
2527 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2529 // Update what the latest live new key is
2530 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2531 if (oldNewKey != NULL) {
2532 // Delete the old new key messages
2533 oldNewKey->setDead();
2538 * Process new table status entries and set dead the old ones as new
2539 * ones come in-> keeps track of the largest and smallest table status
2540 * seen in this current round of updating the local copy of the block
2543 void Table::processEntry(TableStatus *entry, int64_t seq) {
2544 int newNumSlots = entry->getMaxSlots();
2545 updateCurrMaxSize(newNumSlots);
2546 initExpectedSize(seq, newNumSlots);
2548 if (liveTableStatus != NULL) {
2549 // We have a larger table status so the old table status is no
2551 liveTableStatus->setDead();
2554 // Make this new table status the latest alive table status
2555 liveTableStatus = entry;
2559 * Check old messages to see if there is a block chain violation->
2562 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2563 int64_t oldSeqNum = entry->getOldSeqNum();
2564 int64_t newSeqNum = entry->getNewSeqNum();
2565 bool isequal = entry->getEqual();
2566 int64_t machineId = entry->getMachineID();
2567 int64_t seq = entry->getSequenceNumber();
2569 // Check if we have messages that were supposed to be rejected in
2570 // our local block chain
2571 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2573 Slot *slot = indexer->getSlot(seqNum);
2576 // If we have this slot make sure that it was not supposed to be
2578 int64_t slotMachineId = slot->getMachineID();
2579 if (isequal != (slotMachineId == machineId)) {
2580 throw new Error("Server Error: Trying to insert rejected message for slot ");
2585 // Create a list of clients to watch until they see this rejected
2587 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2588 SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
2589 while (iter->hasNext()) {
2590 // Machine ID for the last message entry
2591 int64_t lastMessageEntryMachineId = iter->next();
2593 // We've seen it, don't need to continue to watch-> Our next
2594 // message will implicitly acknowledge it->
2595 if (lastMessageEntryMachineId == localMachineId) {
2599 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
2600 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2602 if (entrySequenceNumber < seq) {
2603 // Add this rejected message to the set of messages that this
2604 // machine ID did not see yet
2605 addWatchVector(lastMessageEntryMachineId, entry);
2606 // This client did not see this rejected message yet so add it
2607 // to the watch set to monitor
2608 deviceWatchSet->add(lastMessageEntryMachineId);
2613 if (deviceWatchSet->isEmpty()) {
2614 // This rejected message has been seen by all the clients so
2616 delete deviceWatchSet;
2618 // We need to watch this rejected message
2619 entry->setWatchSet(deviceWatchSet);
2624 * Check if this abort is live, if not then save it so we can kill it
2625 * later-> update the last transaction number that was arbitrated on->
2627 void Table::processEntry(Abort *entry) {
2628 if (entry->getTransactionSequenceNumber() != -1) {
2629 // update the transaction status if it was sent to the server
2630 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2631 if (status != NULL) {
2632 status->setStatus(TransactionStatus_StatusAborted);
2636 // Abort has not been seen by the client it is for yet so we need to
2639 Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
2640 if (previouslySeenAbort != NULL) {
2641 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2644 if (entry->getTransactionArbitrator() == localMachineId) {
2645 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2648 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2649 // The machine already saw this so it is dead
2651 Pair<int64_t, int64_t> abortid = entry->getAbortId();
2652 liveAbortTable->remove(&abortid);
2654 if (entry->getTransactionArbitrator() == localMachineId) {
2655 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2660 // Update the last arbitration data that we have seen so far
2661 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
2662 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2663 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2665 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2668 // Never seen any data from this arbitrator so record the first one
2669 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2672 // Set dead a transaction if we can
2673 Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
2675 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
2676 if (transactionToSetDead != NULL) {
2677 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2680 // Update the last transaction sequence number that the arbitrator
2682 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
2683 (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
2685 if (entry->getTransactionSequenceNumber() != -1) {
2686 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2692 * Set dead the transaction part if that transaction is dead and keep
2693 * track of all new parts
2695 void Table::processEntry(TransactionPart *entry) {
2696 // Check if we have already seen this transaction and set it dead OR
2697 // if it is not alive
2698 if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
2699 // This transaction is dead, it was already committed or aborted
2704 // This part is still alive
2705 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
2707 if (transactionPart == NULL) {
2708 // Dont have a table for this machine Id yet so make one
2709 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2710 newTransactionParts->put(entry->getMachineId(), transactionPart);
2713 // Update the part and set dead ones we have already seen (got a
2715 TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
2716 if (previouslySeenPart != NULL) {
2717 previouslySeenPart->setDead();
2722 * Process new commit entries and save them for future use-> Delete duplicates
2724 void Table::processEntry(CommitPart *entry) {
2725 // Update the last transaction that was updated if we can
2726 if (entry->getTransactionSequenceNumber() != -1) {
2727 if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
2728 lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
2729 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2733 Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
2734 if (commitPart == NULL) {
2735 // Don't have a table for this machine Id yet so make one
2736 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
2737 newCommitParts->put(entry->getMachineId(), commitPart);
2739 // Update the part and set dead ones we have already seen (got a
2741 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2742 if (previouslySeenPart != NULL) {
2743 previouslySeenPart->setDead();
2748 * Update the last message seen table-> Update and set dead the
2749 * appropriate RejectedMessages as clients see them-> Updates the live
2750 * aborts, removes those that are dead and sets them dead-> Check that
2751 * the last message seen is correct and that there is no mismatch of
2752 * our own last message or that other clients have not had a rollback
2753 * on the last message->
2755 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2756 // We have seen this machine ID
2757 machineSet->remove(machineId);
2759 // Get the set of rejected messages that this machine Id is has not seen yet
2760 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2761 // If there is a rejected message that this machine Id has not seen yet
2762 if (watchset != NULL) {
2763 // Go through each rejected message that this machine Id has not
2766 SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
2767 while (rmit->hasNext()) {
2768 RejectedMessage *rm = rmit->next();
2769 // If this machine Id has seen this rejected message->->->
2770 if (rm->getSequenceNumber() <= seqNum) {
2771 // Remove it from our watchlist
2773 // Decrement machines that need to see this notification
2774 rm->removeWatcher(machineId);
2780 // Set dead the abort
2781 SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
2783 while (abortit->hasNext()) {
2784 Pair<int64_t, int64_t> *key = abortit->next();
2785 Abort *abort = liveAbortTable->get(key);
2786 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2789 if (abort->getTransactionArbitrator() == localMachineId) {
2790 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2795 if (machineId == localMachineId) {
2796 // Our own messages are immediately dead->
2797 char livenessType = liveness->getType();
2798 if (livenessType == TypeLastMessage) {
2799 ((LastMessage *)liveness)->setDead();
2800 } else if (livenessType == TypeSlot) {
2801 ((Slot *)liveness)->setDead();
2803 throw new Error("Unrecognized type");
2806 // Get the old last message for this device
2807 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2808 if (lastMessageEntry == NULL) {
2809 // If no last message then there is nothing else to process
2813 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2814 Liveness *lastEntry = lastMessageEntry->getSecond();
2815 delete lastMessageEntry;
2817 // If it is not our machine Id since we already set ours to dead
2818 if (machineId != localMachineId) {
2819 char lastEntryType = lastEntry->getType();
2821 if (lastEntryType == TypeLastMessage) {
2822 ((LastMessage *)lastEntry)->setDead();
2823 } else if (lastEntryType == TypeSlot) {
2824 ((Slot *)lastEntry)->setDead();
2826 throw new Error("Unrecognized type");
2829 // Make sure the server is not playing any games
2830 if (machineId == localMachineId) {
2831 if (hadPartialSendToServer) {
2832 // We were not making any updates and we had a machine mismatch
2833 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2834 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
2837 // We were not making any updates and we had a machine mismatch
2838 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2839 throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
2843 if (lastMessageSeqNum > seqNum) {
2844 throw new Error("Server Error: Rollback on remote machine sequence number");
2850 * Add a rejected message entry to the watch set to keep track of
2851 * which clients have seen that rejected message entry and which have
2854 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2855 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2856 if (entries == NULL) {
2857 // There is no set for this machine ID yet so create one
2858 entries = new Hashset<RejectedMessage *>();
2859 rejectedMessageWatchVectorTable->put(machineId, entries);
2861 entries->add(entry);
2865 * Check if the HMAC chain is not violated
2867 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2868 for (uint i = 0; i < newSlots->length(); i++) {
2869 Slot *currSlot = newSlots->get(i);
2870 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2871 if (prevSlot != NULL &&
2872 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2873 throw new Error("Server Error: Invalid HMAC Chain");