-#include "Table.h"
-#include "CloudComm.h"
-#include "SlotBuffer.h"
-#include "NewKey.h"
-#include "Slot.h"
-#include "KeyValue.h"
-#include "Error.h"
-#include "PendingTransaction.h"
-#include "TableStatus.h"
-#include "TransactionStatus.h"
-#include "Transaction.h"
-#include "LastMessage.h"
-#include "SecureRandom.h"
-#include "ByteBuffer.h"
-#include "Abort.h"
-#include "CommitPart.h"
-#include "ArbitrationRound.h"
-#include "TransactionPart.h"
-#include "Commit.h"
-#include "RejectedMessage.h"
-#include "SlotIndexer.h"
-#include <stdlib.h>
-
-int compareInt64(const void *a, const void *b) {
- const int64_t *pa = (const int64_t *) a;
- const int64_t *pb = (const int64_t *) b;
- if (*pa < *pb)
- return -1;
- else if (*pa > *pb)
- return 1;
- else
- return 0;
-}
-
-Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
- buffer(NULL),
- cloud(new CloudComm(this, baseurl, password, listeningPort)),
- random(NULL),
- liveTableStatus(NULL),
- pendingTransactionBuilder(NULL),
- lastPendingTransactionSpeculatedOn(NULL),
- firstPendingTransaction(NULL),
- numberOfSlots(0),
- bufferResizeThreshold(0),
- liveSlotCount(0),
- oldestLiveSlotSequenceNumver(1),
- localMachineId(_localMachineId),
- sequenceNumber(0),
- localSequenceNumber(0),
- localTransactionSequenceNumber(0),
- lastTransactionSequenceNumberSpeculatedOn(0),
- oldestTransactionSequenceNumberSpeculatedOn(0),
- localArbitrationSequenceNumber(0),
- hadPartialSendToServer(false),
- attemptedToSendToServer(false),
- expectedsize(0),
- didFindTableStatus(false),
- currMaxSize(0),
- lastSlotAttemptedToSend(NULL),
- lastIsNewKey(false),
- lastNewSize(0),
- lastTransactionPartsSent(NULL),
- lastNewKey(NULL),
- committedKeyValueTable(NULL),
- speculatedKeyValueTable(NULL),
- pendingTransactionSpeculatedKeyValueTable(NULL),
- liveNewKeyTable(NULL),
- lastMessageTable(NULL),
- rejectedMessageWatchVectorTable(NULL),
- arbitratorTable(NULL),
- liveAbortTable(NULL),
- newTransactionParts(NULL),
- newCommitParts(NULL),
- lastArbitratedTransactionNumberByArbitratorTable(NULL),
- liveTransactionBySequenceNumberTable(NULL),
- liveTransactionByTransactionIdTable(NULL),
- liveCommitsTable(NULL),
- liveCommitsByKeyTable(NULL),
- lastCommitSeenSequenceNumberByArbitratorTable(NULL),
- rejectedSlotVector(NULL),
- pendingTransactionQueue(NULL),
- pendingSendArbitrationRounds(NULL),
- pendingSendArbitrationEntriesToDelete(NULL),
- transactionPartsSent(NULL),
- outstandingTransactionStatus(NULL),
- liveAbortsGeneratedByLocal(NULL),
- offlineTransactionsCommittedAndAtServer(NULL),
- localCommunicationTable(NULL),
- lastTransactionSeenFromMachineFromServer(NULL),
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
- lastInsertedNewKey(false),
- lastSeqNumArbOn(0)
-{
- init();
-}
-
-Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
- buffer(NULL),
- cloud(_cloud),
- random(NULL),
- liveTableStatus(NULL),
- pendingTransactionBuilder(NULL),
- lastPendingTransactionSpeculatedOn(NULL),
- firstPendingTransaction(NULL),
- numberOfSlots(0),
- bufferResizeThreshold(0),
- liveSlotCount(0),
- oldestLiveSlotSequenceNumver(1),
- localMachineId(_localMachineId),
- sequenceNumber(0),
- localSequenceNumber(0),
- localTransactionSequenceNumber(0),
- lastTransactionSequenceNumberSpeculatedOn(0),
- oldestTransactionSequenceNumberSpeculatedOn(0),
- localArbitrationSequenceNumber(0),
- hadPartialSendToServer(false),
- attemptedToSendToServer(false),
- expectedsize(0),
- didFindTableStatus(false),
- currMaxSize(0),
- lastSlotAttemptedToSend(NULL),
- lastIsNewKey(false),
- lastNewSize(0),
- lastTransactionPartsSent(NULL),
- lastNewKey(NULL),
- committedKeyValueTable(NULL),
- speculatedKeyValueTable(NULL),
- pendingTransactionSpeculatedKeyValueTable(NULL),
- liveNewKeyTable(NULL),
- lastMessageTable(NULL),
- rejectedMessageWatchVectorTable(NULL),
- arbitratorTable(NULL),
- liveAbortTable(NULL),
- newTransactionParts(NULL),
- newCommitParts(NULL),
- lastArbitratedTransactionNumberByArbitratorTable(NULL),
- liveTransactionBySequenceNumberTable(NULL),
- liveTransactionByTransactionIdTable(NULL),
- liveCommitsTable(NULL),
- liveCommitsByKeyTable(NULL),
- lastCommitSeenSequenceNumberByArbitratorTable(NULL),
- rejectedSlotVector(NULL),
- pendingTransactionQueue(NULL),
- pendingSendArbitrationRounds(NULL),
- pendingSendArbitrationEntriesToDelete(NULL),
- transactionPartsSent(NULL),
- outstandingTransactionStatus(NULL),
- liveAbortsGeneratedByLocal(NULL),
- offlineTransactionsCommittedAndAtServer(NULL),
- localCommunicationTable(NULL),
- lastTransactionSeenFromMachineFromServer(NULL),
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
- lastInsertedNewKey(false),
- lastSeqNumArbOn(0)
-{
- init();
-}
-
-Table::~Table() {
- delete cloud;
- delete random;
- delete buffer;
- // init data structs
- delete committedKeyValueTable;
- delete speculatedKeyValueTable;
- delete pendingTransactionSpeculatedKeyValueTable;
- delete liveNewKeyTable;
- {
- SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
- while (lmit->hasNext()) {
- Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
- delete pair;
- }
- delete lmit;
- delete lastMessageTable;
- }
- if (pendingTransactionBuilder != NULL)
- delete pendingTransactionBuilder;
- {
- SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
- while(rmit->hasNext()) {
- int64_t machineid = rmit->next();
- Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
- SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
- while (mit->hasNext()) {
- RejectedMessage * rm = mit->next();
- delete rm;
- }
- delete mit;
- delete rmset;
- }
- delete rmit;
- delete rejectedMessageWatchVectorTable;
- }
- delete arbitratorTable;
- delete liveAbortTable;
- {
- SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
- while (partsit->hasNext()) {
- int64_t machineId = partsit->next();
- Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
- SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
- while(pit->hasNext()) {
- Pair<int64_t, int32_t> * pair=pit->next();
- pit->currVal()->releaseRef();
- }
- delete pit;
-
- delete parts;
- }
- delete partsit;
- delete newTransactionParts;
- }
- {
- SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
- while (partsit->hasNext()) {
- int64_t machineId = partsit->next();
- Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
- SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
- while(pit->hasNext()) {
- Pair<int64_t, int32_t> * pair=pit->next();
- pit->currVal()->releaseRef();
- }
- delete pit;
- delete parts;
- }
- delete partsit;
- delete newCommitParts;
- }
- delete lastArbitratedTransactionNumberByArbitratorTable;
- delete liveTransactionBySequenceNumberTable;
- delete liveTransactionByTransactionIdTable;
- {
- SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
- while (liveit->hasNext()) {
- int64_t arbitratorId = liveit->next();
-
- // Get all the commits for a specific arbitrator
- Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
- {
- SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
- while (clientit->hasNext()) {
- int64_t id = clientit->next();
- delete commitForClientTable->get(id);
- }
- delete clientit;
- }
-
- delete commitForClientTable;
- }
- delete liveit;
- delete liveCommitsTable;
- }
- delete liveCommitsByKeyTable;
- delete lastCommitSeenSequenceNumberByArbitratorTable;
- delete rejectedSlotVector;
- {
- uint size = pendingTransactionQueue->size();
- for (uint iter = 0; iter < size; iter++) {
- delete pendingTransactionQueue->get(iter);
- }
- delete pendingTransactionQueue;
- }
- delete pendingSendArbitrationEntriesToDelete;
- {
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- delete trit->currVal();
- }
- delete trit;
- delete transactionPartsSent;
- }
- delete outstandingTransactionStatus;
- delete liveAbortsGeneratedByLocal;
- delete offlineTransactionsCommittedAndAtServer;
- delete localCommunicationTable;
- delete lastTransactionSeenFromMachineFromServer;
- {
- for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
- delete pendingSendArbitrationRounds->get(i);
- }
- delete pendingSendArbitrationRounds;
- }
- if (lastTransactionPartsSent != NULL)
- delete lastTransactionPartsSent;
- delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
- if (lastNewKey)
- delete lastNewKey;
-}
-
-/**
- * Init all the stuff needed for for table usage
- */
-void Table::init() {
- // Init helper objects
- random = new SecureRandom();
- buffer = new SlotBuffer();
-
- // init data structs
- committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
- speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
- pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
- liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
- lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
- rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
- arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
- liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
- newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
- newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
- lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
- liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
- liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
- liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
- liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
- lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
- rejectedSlotVector = new Vector<int64_t>();
- pendingTransactionQueue = new Vector<Transaction *>();
- pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
- transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
- outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
- liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
- offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
- localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
- lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
- pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
-
- // Other init stuff
- numberOfSlots = buffer->capacity();
- setResizeThreshold();
-}
-
-/**
- * Initialize the table by inserting a table status as the first entry
- * into the table status also initialize the crypto stuff.
- */
-void Table::initTable() {
- cloud->initSecurity();
-
- // Create the first insertion into the block chain which is the table status
- Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
- localSequenceNumber++;
- TableStatus *status = new TableStatus(s, numberOfSlots);
- s->addShallowEntry(status);
- Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
-
- if (array == NULL) {
- array = new Array<Slot *>(1);
- array->set(0, s);
- // update local block chain
- validateAndUpdate(array, true);
- delete array;
- } else if (array->length() == 1) {
- // in case we did push the slot BUT we failed to init it
- validateAndUpdate(array, true);
- delete s;
- delete array;
- } else {
- delete s;
- delete array;
- throw new Error("Error on initialization");
- }
-}
-
-/**
- * Rebuild the table from scratch by pulling the latest block chain
- * from the server.
- */
-void Table::rebuild() {
- // Just pull the latest slots from the server
- Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
- validateAndUpdate(newslots, true);
- delete newslots;
- sendToServer(NULL);
- updateLiveTransactionsAndStatus();
-}
-
-void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
- localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
-}
-
-int64_t Table::getArbitrator(IoTString *key) {
- return arbitratorTable->get(key);
-}
-
-void Table::close() {
- cloud->closeCloud();
-}
-
-IoTString *Table::getCommitted(IoTString *key) {
- KeyValue *kv = committedKeyValueTable->get(key);
-
- if (kv != NULL) {
- return new IoTString(kv->getValue());
- } else {
- return NULL;
- }
-}
-
-IoTString *Table::getSpeculative(IoTString *key) {
- KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
-
- if (kv == NULL) {
- kv = speculatedKeyValueTable->get(key);
- }
-
- if (kv == NULL) {
- kv = committedKeyValueTable->get(key);
- }
-
- if (kv != NULL) {
- return new IoTString(kv->getValue());
- } else {
- return NULL;
- }
-}
-
-IoTString *Table::getCommittedAtomic(IoTString *key) {
- KeyValue *kv = committedKeyValueTable->get(key);
-
- if (!arbitratorTable->contains(key)) {
- throw new Error("Key not Found.");
- }
-
- // Make sure new key value pair matches the current arbitrator
- if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
- // TODO: Maybe not throw en error
- throw new Error("Not all Key Values Match Arbitrator.");
- }
-
- if (kv != NULL) {
- pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
- return new IoTString(kv->getValue());
- } else {
- pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
- return NULL;
- }
-}
-
-IoTString *Table::getSpeculativeAtomic(IoTString *key) {
- if (!arbitratorTable->contains(key)) {
- throw new Error("Key not Found.");
- }
-
- // Make sure new key value pair matches the current arbitrator
- if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
- // TODO: Maybe not throw en error
- throw new Error("Not all Key Values Match Arbitrator.");
- }
-
- KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
-
- if (kv == NULL) {
- kv = speculatedKeyValueTable->get(key);
- }
-
- if (kv == NULL) {
- kv = committedKeyValueTable->get(key);
- }
-
- if (kv != NULL) {
- pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
- return new IoTString(kv->getValue());
- } else {
- pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
- return NULL;
- }
-}
-
-bool Table::update() {
- try {
- Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
- validateAndUpdate(newSlots, false);
- delete newSlots;
- sendToServer(NULL);
- updateLiveTransactionsAndStatus();
- return true;
- } catch (Exception *e) {
- SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
- while (kit->hasNext()) {
- int64_t m = kit->next();
- updateFromLocal(m);
- }
- delete kit;
- }
-
- return false;
-}
-
-bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
- while (true) {
- if (arbitratorTable->contains(keyName)) {
- // There is already an arbitrator
- return false;
- }
- NewKey *newKey = new NewKey(NULL, keyName, machineId);
-
- if (sendToServer(newKey)) {
- // If successfully inserted
- return true;
- }
- }
-}
-
-void Table::startTransaction() {
- // Create a new transaction, invalidates any old pending transactions.
- if (pendingTransactionBuilder != NULL)
- delete pendingTransactionBuilder;
- pendingTransactionBuilder = new PendingTransaction(localMachineId);
-}
-
-void Table::put(IoTString *key, IoTString *value) {
- // Make sure it is a valid key
- if (!arbitratorTable->contains(key)) {
- throw new Error("Key not Found.");
- }
-
- // Make sure new key value pair matches the current arbitrator
- if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
- // TODO: Maybe not throw en error
- throw new Error("Not all Key Values Match Arbitrator.");
- }
-
- // Add the key value to this transaction
- KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
- pendingTransactionBuilder->addKV(kv);
-}
-
-TransactionStatus *Table::commitTransaction() {
- if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
- // transaction with no updates will have no effect on the system
- return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
- }
-
- // Set the local transaction sequence number and increment
- pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
- localTransactionSequenceNumber++;
-
- // Create the transaction status
- TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
-
- // Create the new transaction
- Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
- newTransaction->setTransactionStatus(transactionStatus);
-
- if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
- // Add it to the queue and invalidate the builder for safety
- pendingTransactionQueue->add(newTransaction);
- } else {
- arbitrateOnLocalTransaction(newTransaction);
- delete newTransaction;
- updateLiveStateFromLocal();
- }
- if (pendingTransactionBuilder != NULL)
- delete pendingTransactionBuilder;
-
- pendingTransactionBuilder = new PendingTransaction(localMachineId);
-
- try {
- sendToServer(NULL);
- } catch (ServerException *e) {
-
- Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
- uint size = pendingTransactionQueue->size();
- uint oldindex = 0;
- for (uint iter = 0; iter < size; iter++) {
- Transaction *transaction = pendingTransactionQueue->get(iter);
- pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
-
- if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
- // Already contacted this client so ignore all attempts to contact this client
- // to preserve ordering for arbitrator
- continue;
- }
-
- Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
-
- if (sendReturn.getFirst()) {
- // Failed to contact over local
- arbitratorTriedAndFailed->add(transaction->getArbitrator());
- } else {
- // Successful contact or should not contact
-
- if (sendReturn.getSecond()) {
- // did arbitrate
- delete transaction;
- oldindex--;
- }
- }
- }
- pendingTransactionQueue->setSize(oldindex);
- }
-
- updateLiveStateFromLocal();
-
- return transactionStatus;
-}
-
-/**
- * Recalculate the new resize threshold
- */
-void Table::setResizeThreshold() {
- int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
- bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
-}
-
-int64_t Table::getLocalSequenceNumber() {
- return localSequenceNumber;
-}
-
-void Table::processTransactionList(bool handlePartial) {
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully
- // sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- delete transaction;
- } else if (handlePartial) {
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- }
- delete trit;
-}
-
-NewKey * Table::handlePartialSend(NewKey * newKey) {
- //Didn't receive acknowledgement for last send
- //See if the server has received a newer slot
-
- Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
- if (newSlots->length() == 0) {
- //Retry sending old slot
- bool wasInserted = false;
- bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
-
- if (sendSlotsReturn) {
- lastSlotAttemptedToSend = NULL;
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
- delete newKey;
- newKey = NULL;
- }
- }
- processTransactionList(false);
- } else {
- if (checkSend(newSlots, lastSlotAttemptedToSend)) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
- delete newKey;
- newKey = NULL;
- }
- }
- processTransactionList(true);
- }
- }
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- delete trit;
-
- if (newSlots->length() != 0) {
- // insert into the local block chain
- validateAndUpdate(newSlots, true);
- }
- } else {
- if (checkSend(newSlots, lastSlotAttemptedToSend)) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
- delete newKey;
- newKey = NULL;
- }
- }
-
- processTransactionList(true);
- } else {
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- delete trit;
- }
-
- // insert into the local block chain
- validateAndUpdate(newSlots, true);
- }
- delete newSlots;
- return newKey;
-}
-
-void Table::clearSentParts() {
- // Clear the sent data since we are trying again
- pendingSendArbitrationEntriesToDelete->clear();
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- delete trit->currVal();
- }
- delete trit;
- transactionPartsSent->clear();
-}
-
-bool Table::sendToServer(NewKey *newKey) {
- if (hadPartialSendToServer) {
- newKey = handlePartialSend(newKey);
- }
-
- try {
- // While we have stuff that needs inserting into the block chain
- while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
- if (hadPartialSendToServer) {
- throw new Error("Should Be error free");
- }
-
- // If there is a new key with same name then end
- if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
- delete newKey;
- return false;
- }
-
- // Create the slot
- Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
- localSequenceNumber++;
-
- // Try to fill the slot with data
- int newSize = 0;
- bool insertedNewKey = false;
- bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
-
- if (needsResize) {
- // Reset which transaction to send
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetNextPartToSend();
-
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
- transaction->setSequenceNumber(-1);
- }
- }
- delete trit;
-
- // Clear the sent data since we are trying again
- clearSentParts();
-
- // We needed a resize so try again
- fillSlot(slot, true, newKey, newSize, insertedNewKey);
- }
- if (lastSlotAttemptedToSend != NULL)
- delete lastSlotAttemptedToSend;
-
- lastSlotAttemptedToSend = slot;
- lastIsNewKey = (newKey != NULL);
- lastInsertedNewKey = insertedNewKey;
- lastNewSize = newSize;
- if (( newKey != lastNewKey) && (lastNewKey != NULL))
- delete lastNewKey;
- lastNewKey = newKey;
- if (lastTransactionPartsSent != NULL)
- delete lastTransactionPartsSent;
- lastTransactionPartsSent = transactionPartsSent->clone();
-
- Array<Slot *> * newSlots = NULL;
- bool wasInserted = false;
- bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
-
- if (sendSlotsReturn) {
- lastSlotAttemptedToSend = NULL;
- // Did insert into the block chain
- if (insertedNewKey) {
- // This slot was what was inserted not a previous slot
- // New Key was successfully inserted into the block chain so dont want to insert it again
- newKey = NULL;
- }
-
- // Remove the aborts and commit parts that were sent from the pending to send queue
- uint size = pendingSendArbitrationRounds->size();
- uint oldcount = 0;
- for (uint i = 0; i < size; i++) {
- ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
- round->removeParts(pendingSendArbitrationEntriesToDelete);
-
- if (!round->isDoneSending()) {
- //Add part back in
- pendingSendArbitrationRounds->set(oldcount++,
- pendingSendArbitrationRounds->get(i));
- } else
- delete pendingSendArbitrationRounds->get(i);
- }
- pendingSendArbitrationRounds->setSize(oldcount);
- processTransactionList(false);
- } else {
- // Reset which transaction to send
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetNextPartToSend();
-
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
- transaction->setSequenceNumber(-1);
- }
- }
- delete trit;
- }
-
- // Clear the sent data in preparation for next send
- clearSentParts();
-
- if (newSlots->length() != 0) {
- // insert into the local block chain
- validateAndUpdate(newSlots, true);
- }
- delete newSlots;
- }
- } catch (ServerException *e) {
- if (e->getType() != ServerException_TypeInputTimeout) {
- // Nothing was able to be sent to the server so just clear these data structures
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetNextPartToSend();
-
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
- transaction->setSequenceNumber(-1);
- }
- }
- delete trit;
- } else {
- // There was a partial send to the server
- hadPartialSendToServer = true;
-
- // Nothing was able to be sent to the server so just clear these data structures
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetNextPartToSend();
- transaction->setServerFailure();
- }
- delete trit;
- }
-
- clearSentParts();
-
- throw e;
- }
-
- return newKey == NULL;
-}
-
-bool Table::updateFromLocal(int64_t machineId) {
- if (!localCommunicationTable->contains(machineId))
- return false;
-
- Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
-
- // Get the size of the send data
- int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
-
- int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
- if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
- lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
- }
-
- Array<char> *sendData = new Array<char>(sendDataSize);
- ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
-
- // Encode the data
- bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
- bbEncode->putInt(0);
-
- // Send by local
- Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
- localSequenceNumber++;
-
- if (returnData == NULL) {
- // Could not contact server
- return false;
- }
-
- // Decode the data
- ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
- int numberOfEntries = bbDecode->getInt();
-
- for (int i = 0; i < numberOfEntries; i++) {
- char type = bbDecode->get();
- if (type == TypeAbort) {
- Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
- processEntry(abort);
- } else if (type == TypeCommitPart) {
- CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
- processEntry(commitPart);
- }
- }
-
- updateLiveStateFromLocal();
-
- return true;
-}
-
-Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
-
- // Get the devices local communications
- if (!localCommunicationTable->contains(transaction->getArbitrator()))
- return Pair<bool, bool>(true, false);
-
- Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
-
- // Get the size of the send data
- int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
- {
- Vector<TransactionPart *> *tParts = transaction->getParts();
- uint tPartsSize = tParts->size();
- for (uint i = 0; i < tPartsSize; i++) {
- TransactionPart *part = tParts->get(i);
- sendDataSize += part->getSize();
- }
- }
-
- int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
- if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
- lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
- }
-
- // Make the send data size
- Array<char> *sendData = new Array<char>(sendDataSize);
- ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
-
- // Encode the data
- bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
- bbEncode->putInt(transaction->getParts()->size());
- {
- Vector<TransactionPart *> *tParts = transaction->getParts();
- uint tPartsSize = tParts->size();
- for (uint i = 0; i < tPartsSize; i++) {
- TransactionPart *part = tParts->get(i);
- part->encode(bbEncode);
- }
- }
-
- // Send by local
- Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
- localSequenceNumber++;
-
- if (returnData == NULL) {
- // Could not contact server
- return Pair<bool, bool>(true, false);
- }
-
- // Decode the data
- ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
- bool didCommit = bbDecode->get() == 1;
- bool couldArbitrate = bbDecode->get() == 1;
- int numberOfEntries = bbDecode->getInt();
- bool foundAbort = false;
-
- for (int i = 0; i < numberOfEntries; i++) {
- char type = bbDecode->get();
- if (type == TypeAbort) {
- Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
-
- if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
- foundAbort = true;
- }
-
- processEntry(abort);
- } else if (type == TypeCommitPart) {
- CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
- processEntry(commitPart);
- }
- }
-
- updateLiveStateFromLocal();
-
- if (couldArbitrate) {
- TransactionStatus *status = transaction->getTransactionStatus();
- if (didCommit) {
- status->setStatus(TransactionStatus_StatusCommitted);
- } else {
- status->setStatus(TransactionStatus_StatusAborted);
- }
- } else {
- TransactionStatus *status = transaction->getTransactionStatus();
- if (foundAbort) {
- status->setStatus(TransactionStatus_StatusAborted);
- } else {
- status->setStatus(TransactionStatus_StatusCommitted);
- }
- }
-
- return Pair<bool, bool>(false, true);
-}
-
-Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
- // Decode the data
- ByteBuffer *bbDecode = ByteBuffer_wrap(data);
- int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
- int numberOfParts = bbDecode->getInt();
-
- // If we did commit a transaction or not
- bool didCommit = false;
- bool couldArbitrate = false;
-
- if (numberOfParts != 0) {
-
- // decode the transaction
- Transaction *transaction = new Transaction();
- for (int i = 0; i < numberOfParts; i++) {
- bbDecode->get();
- TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
- transaction->addPartDecode(newPart);
- }
-
- // Arbitrate on transaction and pull relevant return data
- Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
- couldArbitrate = localArbitrateReturn.getFirst();
- didCommit = localArbitrateReturn.getSecond();
-
- updateLiveStateFromLocal();
-
- // Transaction was sent to the server so keep track of it to prevent double commit
- if (transaction->getSequenceNumber() != -1) {
- offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
- }
- }
-
- // The data to send back
- int returnDataSize = 0;
- Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
-
- // Get the aborts to send back
- Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
- {
- SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
- while (abortit->hasNext())
- abortLocalSequenceNumbers->add(abortit->next());
- delete abortit;
- }
-
- qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
-
- uint asize = abortLocalSequenceNumbers->size();
- for (uint i = 0; i < asize; i++) {
- int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
- if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
- continue;
- }
-
- Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
- unseenArbitrations->add(abort);
- returnDataSize += abort->getSize();
- }
-
- // Get the commits to send back
- Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
- if (commitForClientTable != NULL) {
- Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
- {
- SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
- while (commitit->hasNext())
- commitLocalSequenceNumbers->add(commitit->next());
- delete commitit;
- }
- qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
-
- uint clsSize = commitLocalSequenceNumbers->size();
- for (uint clsi = 0; clsi < clsSize; clsi++) {
- int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
- Commit *commit = commitForClientTable->get(localSequenceNumber);
-
- if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
- continue;
- }
-
- {
- Vector<CommitPart *> *parts = commit->getParts();
- uint nParts = parts->size();
- for (uint i = 0; i < nParts; i++) {
- CommitPart *commitPart = parts->get(i);
- unseenArbitrations->add(commitPart);
- returnDataSize += commitPart->getSize();
- }
- }
- }
- }
-
- // Number of arbitration entries to decode
- returnDataSize += 2 * sizeof(int32_t);
-
- // bool of did commit or not
- if (numberOfParts != 0) {
- returnDataSize += sizeof(char);
- }
-
- // Data to send Back
- Array<char> *returnData = new Array<char>(returnDataSize);
- ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
-
- if (numberOfParts != 0) {
- if (didCommit) {
- bbEncode->put((char)1);
- } else {
- bbEncode->put((char)0);
- }
- if (couldArbitrate) {
- bbEncode->put((char)1);
- } else {
- bbEncode->put((char)0);
- }
- }
-
- bbEncode->putInt(unseenArbitrations->size());
- uint size = unseenArbitrations->size();
- for (uint i = 0; i < size; i++) {
- Entry *entry = unseenArbitrations->get(i);
- entry->encode(bbEncode);
- }
-
- localSequenceNumber++;
- return returnData;
-}
-
-/** Checks whether a given slot was sent using new slots in
- array. Returns true if sent and false otherwise. */
-
-bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
- uint size = array->length();
- for (uint i = 0; i < size; i++) {
- Slot *s = array->get(i);
- if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
- return true;
- }
- }
-
- //Also need to see if other machines acknowledged our message
- for (uint i = 0; i < size; i++) {
- Slot *s = array->get(i);
-
- // Process each entry in the slot
- Vector<Entry *> *entries = s->getEntries();
- uint eSize = entries->size();
- for (uint ei = 0; ei < eSize; ei++) {
- Entry *entry = entries->get(ei);
-
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
-
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
- return true;
- }
- }
- }
- }
- //Not found
- return false;
-}
-
-/** Method tries to send slot to server. Returns status in tuple.
- isInserted returns whether last un-acked send (if any) was
- successful. Returns whether send was confirmed.x
- */
-
-bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
- attemptedToSendToServer = true;
-
- *array = cloud->putSlot(slot, newSize);
- if (*array == NULL) {
- *array = new Array<Slot *>(1);
- (*array)->set(0, slot);
- rejectedSlotVector->clear();
- *isInserted = false;
- return true;
- } else {
- if ((*array)->length() == 0) {
- throw new Error("Server Error: Did not send any slots");
- }
-
- if (hadPartialSendToServer) {
- *isInserted = checkSend(*array, slot);
-
- if (!(*isInserted)) {
- rejectedSlotVector->add(slot->getSequenceNumber());
- }
-
- return false;
- } else {
- rejectedSlotVector->add(slot->getSequenceNumber());
- *isInserted = false;
- return false;
- }
- }
-}
-
-/**
- * Returns true if a resize was needed but not done.
- */
-bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
- newSize = 0;//special value to indicate no resize
- if (liveSlotCount > bufferResizeThreshold) {
- resize = true;//Resize is forced
- }
-
- if (resize) {
- newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
- TableStatus *status = new TableStatus(slot, newSize);
- slot->addShallowEntry(status);
- }
-
- // Fill with rejected slots first before doing anything else
- doRejectedMessages(slot);
-
- // Do mandatory rescue of entries
- ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
-
- // Extract working variables
- bool needsResize = mandatoryRescueReturn.getFirst();
- bool seenLiveSlot = mandatoryRescueReturn.getSecond();
- int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
-
- if (needsResize && !resize) {
- // We need to resize but we are not resizing so return true to force on retry
- return true;
- }
-
- insertedKey = false;
- if (newKeyEntry != NULL) {
- newKeyEntry->setSlot(slot);
- if (slot->hasSpace(newKeyEntry)) {
- slot->addEntry(newKeyEntry);
- insertedKey = true;
- }
- }
-
- // Clear the transactions, aborts and commits that were sent previously
- clearSentParts();
- uint size = pendingSendArbitrationRounds->size();
- for (uint i = 0; i < size; i++) {
- ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
- bool isFull = false;
- round->generateParts();
- Vector<Entry *> *parts = round->getParts();
-
- // Insert pending arbitration data
- uint vsize = parts->size();
- for (uint vi = 0; vi < vsize; vi++) {
- Entry *arbitrationData = parts->get(vi);
-
- // If it is an abort then we need to set some information
- if (arbitrationData->getType() == TypeAbort) {
- ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
- }
-
- if (!slot->hasSpace(arbitrationData)) {
- // No space so cant do anything else with these data entries
- isFull = true;
- break;
- }
-
- // Add to this current slot and add it to entries to delete
- slot->addEntry(arbitrationData);
- pendingSendArbitrationEntriesToDelete->add(arbitrationData);
- }
-
- if (isFull) {
- break;
- }
- }
-
- if (pendingTransactionQueue->size() > 0) {
- Transaction *transaction = pendingTransactionQueue->get(0);
- // Set the transaction sequence number if it has yet to be inserted into the block chain
- if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
- transaction->setSequenceNumber(slot->getSequenceNumber());
- }
-
- while (true) {
- TransactionPart *part = transaction->getNextPartToSend();
- if (part == NULL) {
- // Ran out of parts to send for this transaction so move on
- break;
- }
-
- if (slot->hasSpace(part)) {
- slot->addEntry(part);
- Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
- if (partsSent == NULL) {
- partsSent = new Vector<int32_t>();
- transactionPartsSent->put(transaction, partsSent);
- }
- partsSent->add(part->getPartNumber());
- transactionPartsSent->put(transaction, partsSent);
- } else {
- break;
- }
- }
- }
-
- // Fill the remainder of the slot with rescue data
- doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
-
- return false;
-}
-
-void Table::doRejectedMessages(Slot *s) {
- if (!rejectedSlotVector->isEmpty()) {
- /* TODO: We should avoid generating a rejected message entry if
- * there is already a sufficient entry in the queue (e->g->,
- * equalsto value of true and same sequence number)-> */
-
- int64_t old_seqn = rejectedSlotVector->get(0);
- if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
- int64_t new_seqn = rejectedSlotVector->lastElement();
- RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
- s->addShallowEntry(rm);
- } else {
- int64_t prev_seqn = -1;
- uint i = 0;
- /* Go through list of missing messages */
- for (; i < rejectedSlotVector->size(); i++) {
- int64_t curr_seqn = rejectedSlotVector->get(i);
- Slot *s_msg = buffer->getSlot(curr_seqn);
- if (s_msg != NULL)
- break;
- prev_seqn = curr_seqn;
- }
- /* Generate rejected message entry for missing messages */
- if (prev_seqn != -1) {
- RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
- s->addShallowEntry(rm);
- }
- /* Generate rejected message entries for present messages */
- for (; i < rejectedSlotVector->size(); i++) {
- int64_t curr_seqn = rejectedSlotVector->get(i);
- Slot *s_msg = buffer->getSlot(curr_seqn);
- int64_t machineid = s_msg->getMachineID();
- RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
- s->addShallowEntry(rm);
- }
- }
- }
-}
-
-ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
- int64_t newestSequenceNumber = buffer->getNewestSeqNum();
- int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
- if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
- oldestLiveSlotSequenceNumver = oldestSequenceNumber;
- }
-
- int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
- bool seenLiveSlot = false;
- int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
- int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
-
-
- // Mandatory Rescue
- for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
- Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
- // Push slot number forward
- if (!seenLiveSlot) {
- oldestLiveSlotSequenceNumver = currentSequenceNumber;
- }
-
- if (!previousSlot->isLive()) {
- continue;
- }
-
- // We have seen a live slot
- seenLiveSlot = true;
-
- // Get all the live entries for a slot
- Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
-
- // Iterate over all the live entries and try to rescue them
- uint lESize = liveEntries->size();
- for (uint i = 0; i < lESize; i++) {
- Entry *liveEntry = liveEntries->get(i);
- if (slot->hasSpace(liveEntry)) {
- // Enough space to rescue the entry
- slot->addEntry(liveEntry);
- } else if (currentSequenceNumber == firstIfFull) {
- //if there's no space but the entry is about to fall off the queue
- return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
- }
- }
- }
-
- // Did not resize
- return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
-}
-
-void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
- /* now go through live entries from least to greatest sequence number until
- * either all live slots added, or the slot doesn't have enough room
- * for SKIP_THRESHOLD consecutive entries*/
- int skipcount = 0;
- int64_t newestseqnum = buffer->getNewestSeqNum();
- for (; seqn <= newestseqnum; seqn++) {
- Slot *prevslot = buffer->getSlot(seqn);
- //Push slot number forward
- if (!seenliveslot)
- oldestLiveSlotSequenceNumver = seqn;
-
- if (!prevslot->isLive())
- continue;
- seenliveslot = true;
- Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
- uint lESize = liveentries->size();
- for (uint i = 0; i < lESize; i++) {
- Entry *liveentry = liveentries->get(i);
- if (s->hasSpace(liveentry))
- s->addEntry(liveentry);
- else {
- skipcount++;
- if (skipcount > Table_SKIP_THRESHOLD) {
- delete liveentries;
- goto donesearch;
- }
- }
- }
- delete liveentries;
- }
-donesearch:
- ;
-}
-
-/**
- * Checks for malicious activity and updates the local copy of the block chain->
- */
-void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
- // The cloud communication layer has checked slot HMACs already
- // before decoding
- if (newSlots->length() == 0) {
- return;
- }
-
- // Make sure all slots are newer than the last largest slot this
- // client has seen
- int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
- if (firstSeqNum <= sequenceNumber) {
- throw new Error("Server Error: Sent older slots!");
- }
-
- // Create an object that can access both new slots and slots in our
- // local chain without committing slots to our local chain
- SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
-
- // Check that the HMAC chain is not broken
- checkHMACChain(indexer, newSlots);
-
- // Set to keep track of messages from clients
- Hashset<int64_t> *machineSet = new Hashset<int64_t>();
- {
- SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
- while (lmit->hasNext())
- machineSet->add(lmit->next());
- delete lmit;
- }
-
- // Process each slots data
- {
- uint numSlots = newSlots->length();
- for (uint i = 0; i < numSlots; i++) {
- Slot *slot = newSlots->get(i);
- processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
- updateExpectedSize();
- }
- }
- delete indexer;
-
- // If there is a gap, check to see if the server sent us
- // everything->
- if (firstSeqNum != (sequenceNumber + 1)) {
-
- // Check the size of the slots that were sent down by the server->
- // Can only check the size if there was a gap
- checkNumSlots(newSlots->length());
-
- // Since there was a gap every machine must have pushed a slot or
- // must have a last message message-> If not then the server is
- // hiding slots
- if (!machineSet->isEmpty()) {
- delete machineSet;
- throw new Error("Missing record for machines: ");
- }
- }
- delete machineSet;
- // Update the size of our local block chain->
- commitNewMaxSize();
-
- // Commit new to slots to the local block chain->
- {
- uint numSlots = newSlots->length();
- for (uint i = 0; i < numSlots; i++) {
- Slot *slot = newSlots->get(i);
-
- // Insert this slot into our local block chain copy->
- buffer->putSlot(slot);
-
- // Keep track of how many slots are currently live (have live data
- // in them)->
- liveSlotCount++;
- }
- }
- // Get the sequence number of the latest slot in the system
- sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
- updateLiveStateFromServer();
-
- // No Need to remember after we pulled from the server
- offlineTransactionsCommittedAndAtServer->clear();
-
- // This is invalidated now
- hadPartialSendToServer = false;
-}
-
-void Table::updateLiveStateFromServer() {
- // Process the new transaction parts
- processNewTransactionParts();
-
- // Do arbitration on new transactions that were received
- arbitrateFromServer();
-
- // Update all the committed keys
- bool didCommitOrSpeculate = updateCommittedTable();
-
- // Delete the transactions that are now dead
- updateLiveTransactionsAndStatus();
-
- // Do speculations
- didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
- updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
-}
-
-void Table::updateLiveStateFromLocal() {
- // Update all the committed keys
- bool didCommitOrSpeculate = updateCommittedTable();
-
- // Delete the transactions that are now dead
- updateLiveTransactionsAndStatus();
-
- // Do speculations
- didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
- updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
-}
-
-void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
- int64_t prevslots = firstSequenceNumber;
-
- if (didFindTableStatus) {
- } else {
- expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
- }
-
- didFindTableStatus = true;
- currMaxSize = numberOfSlots;
-}
-
-void Table::updateExpectedSize() {
- expectedsize++;
-
- if (expectedsize > currMaxSize) {
- expectedsize = currMaxSize;
- }
-}
-
-
-/**
- * Check the size of the block chain to make sure there are enough
- * slots sent back by the server-> This is only called when we have a
- * gap between the slots that we have locally and the slots sent by
- * the server therefore in the slots sent by the server there will be
- * at least 1 Table status message
- */
-void Table::checkNumSlots(int numberOfSlots) {
- if (numberOfSlots != expectedsize) {
- throw new Error("Server Error: Server did not send all slots-> Expected: ");
- }
-}
-
-/**
- * Update the size of of the local buffer if it is needed->
- */
-void Table::commitNewMaxSize() {
- didFindTableStatus = false;
-
- // Resize the local slot buffer
- if (numberOfSlots != currMaxSize) {
- buffer->resize((int32_t)currMaxSize);
- }
-
- // Change the number of local slots to the new size
- numberOfSlots = (int32_t)currMaxSize;
-
- // Recalculate the resize threshold since the size of the local
- // buffer has changed
- setResizeThreshold();
-}
-
-/**
- * Process the new transaction parts from this latest round of slots
- * received from the server
- */
-void Table::processNewTransactionParts() {
-
- if (newTransactionParts->size() == 0) {
- // Nothing new to process
- return;
- }
-
- // Iterate through all the machine Ids that we received new parts
- // for
- SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
- while (tpit->hasNext()) {
- int64_t machineId = tpit->next();
- Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
-
- SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
- // Iterate through all the parts for that machine Id
- while (ptit->hasNext()) {
- Pair<int64_t, int32_t> *partId = ptit->next();
- TransactionPart *part = parts->get(partId);
-
- if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
- int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
- if (lastTransactionNumber >= part->getSequenceNumber()) {
- // Set dead the transaction part
- part->setDead();
- part->releaseRef();
- continue;
- }
- }
-
- // Get the transaction object for that sequence number
- Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
-
- if (transaction == NULL) {
- // This is a new transaction that we dont have so make a new one
- transaction = new Transaction();
-
- // Add that part to the transaction
- transaction->addPartDecode(part);
-
- // Insert this new transaction into the live tables
- liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
- liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
- }
- part->releaseRef();
- }
- delete ptit;
- }
- delete tpit;
- // Clear all the new transaction parts in preparation for the next
- // time the server sends slots
- {
- SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
- while (partsit->hasNext()) {
- int64_t machineId = partsit->next();
- Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
- delete parts;
- }
- delete partsit;
- newTransactionParts->clear();
- }
-}
-
-void Table::arbitrateFromServer() {
- if (liveTransactionBySequenceNumberTable->size() == 0) {
- // Nothing to arbitrate on so move on
- return;
- }
-
- // Get the transaction sequence numbers and sort from oldest to newest
- Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
- {
- SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
- while (trit->hasNext())
- transactionSequenceNumbers->add(trit->next());
- delete trit;
- }
- qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
-
- // Collection of key value pairs that are
- Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
-
- // The last transaction arbitrated on
- int64_t lastTransactionCommitted = -1;
- Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
- uint tsnSize = transactionSequenceNumbers->size();
- for (uint i = 0; i < tsnSize; i++) {
- int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
- Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
-
- // Check if this machine arbitrates for this transaction if not
- // then we cant arbitrate this transaction
- if (transaction->getArbitrator() != localMachineId) {
- continue;
- }
-
- if (transactionSequenceNumber < lastSeqNumArbOn) {
- continue;
- }
-
- if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
- // We have seen this already locally so dont commit again
- continue;
- }
-
- if (!transaction->isComplete()) {
- // Will arbitrate in incorrect order if we continue so just break
- // Most likely this
- break;
- }
-
- // update the largest transaction seen by arbitrator from server
- if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
- lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
- } else {
- int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
- if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
- lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
- }
- }
-
- if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
- // Guard evaluated as true
- // Update the local changes so we can make the commit
- SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- speculativeTableTmp->put(kv->getKey(), kv);
- }
- delete kvit;
-
- // Update what the last transaction committed was for use in batch commit
- lastTransactionCommitted = transactionSequenceNumber;
- } else {
- // Guard evaluated was false so create abort
- // Create the abort
- Abort *newAbort = new Abort(NULL,
- transaction->getClientLocalSequenceNumber(),
- transaction->getSequenceNumber(),
- transaction->getMachineId(),
- transaction->getArbitrator(),
- localArbitrationSequenceNumber);
- localArbitrationSequenceNumber++;
- generatedAborts->add(newAbort);
-
- // Insert the abort so we can process
- processEntry(newAbort);
- }
-
- lastSeqNumArbOn = transactionSequenceNumber;
- }
-
- delete transactionSequenceNumbers;
-
- Commit *newCommit = NULL;
-
- // If there is something to commit
- if (speculativeTableTmp->size() != 0) {
- // Create the commit and increment the commit sequence number
- newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
- localArbitrationSequenceNumber++;
-
- // Add all the new keys to the commit
- SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
- while (spit->hasNext()) {
- IoTString *string = spit->next();
- KeyValue *kv = speculativeTableTmp->get(string);
- newCommit->addKV(kv);
- }
- delete spit;
-
- // create the commit parts
- newCommit->createCommitParts();
-
- // Append all the commit parts to the end of the pending queue
- // waiting for sending to the server
- // Insert the commit so we can process it
- Vector<CommitPart *> *parts = newCommit->getParts();
- uint partsSize = parts->size();
- for (uint i = 0; i < partsSize; i++) {
- CommitPart *commitPart = parts->get(i);
- processEntry(commitPart);
- }
- }
- delete speculativeTableTmp;
-
- if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
- ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
- pendingSendArbitrationRounds->add(arbitrationRound);
-
- if (compactArbitrationData()) {
- ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
- if (newArbitrationRound->getCommit() != NULL) {
- Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
- uint partsSize = parts->size();
- for (uint i = 0; i < partsSize; i++) {
- CommitPart *commitPart = parts->get(i);
- processEntry(commitPart);
- }
- }
- }
- } else {
- delete generatedAborts;
- }
-}
-
-Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
-
- // Check if this machine arbitrates for this transaction if not then
- // we cant arbitrate this transaction
- if (transaction->getArbitrator() != localMachineId) {
- return Pair<bool, bool>(false, false);
- }
-
- if (!transaction->isComplete()) {
- // Will arbitrate in incorrect order if we continue so just break
- // Most likely this
- return Pair<bool, bool>(false, false);
- }
-
- if (transaction->getMachineId() != localMachineId) {
- // dont do this check for local transactions
- if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
- if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
- // We've have already seen this from the server
- return Pair<bool, bool>(false, false);
- }
- }
- }
-
- if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
- // Guard evaluated as true Create the commit and increment the
- // commit sequence number
- Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
- localArbitrationSequenceNumber++;
-
- // Update the local changes so we can make the commit
- SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- newCommit->addKV(kv);
- }
- delete kvit;
-
- // create the commit parts
- newCommit->createCommitParts();
-
- // Append all the commit parts to the end of the pending queue
- // waiting for sending to the server
- ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
- pendingSendArbitrationRounds->add(arbitrationRound);
-
- if (compactArbitrationData()) {
- ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
- Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
- uint partsSize = parts->size();
- for (uint i = 0; i < partsSize; i++) {
- CommitPart *commitPart = parts->get(i);
- processEntry(commitPart);
- }
- } else {
- // Insert the commit so we can process it
- Vector<CommitPart *> *parts = newCommit->getParts();
- uint partsSize = parts->size();
- for (uint i = 0; i < partsSize; i++) {
- CommitPart *commitPart = parts->get(i);
- processEntry(commitPart);
- }
- }
-
- if (transaction->getMachineId() == localMachineId) {
- TransactionStatus *status = transaction->getTransactionStatus();
- if (status != NULL) {
- status->setStatus(TransactionStatus_StatusCommitted);
- }
- }
-
- updateLiveStateFromLocal();
- return Pair<bool, bool>(true, true);
- } else {
- if (transaction->getMachineId() == localMachineId) {
- // For locally created messages update the status
- // Guard evaluated was false so create abort
- TransactionStatus *status = transaction->getTransactionStatus();
- if (status != NULL) {
- status->setStatus(TransactionStatus_StatusAborted);
- }
- } else {
- Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
-
- // Create the abort
- Abort *newAbort = new Abort(NULL,
- transaction->getClientLocalSequenceNumber(),
- -1,
- transaction->getMachineId(),
- transaction->getArbitrator(),
- localArbitrationSequenceNumber);
- localArbitrationSequenceNumber++;
- addAbortSet->add(newAbort);
-
- // Append all the commit parts to the end of the pending queue
- // waiting for sending to the server
- ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
- pendingSendArbitrationRounds->add(arbitrationRound);
-
- if (compactArbitrationData()) {
- ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
-
- Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
- uint partsSize = parts->size();
- for (uint i = 0; i < partsSize; i++) {
- CommitPart *commitPart = parts->get(i);
- processEntry(commitPart);
- }
- }
- }
-
- updateLiveStateFromLocal();
- return Pair<bool, bool>(true, false);
- }
-}
-
-/**
- * Compacts the arbitration data by merging commits and aggregating
- * aborts so that a single large push of commits can be done instead
- * of many small updates
- */
-bool Table::compactArbitrationData() {
- if (pendingSendArbitrationRounds->size() < 2) {
- // Nothing to compact so do nothing
- return false;
- }
-
- ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
- if (lastRound->getDidSendPart()) {
- return false;
- }
-
- bool hadCommit = (lastRound->getCommit() == NULL);
- bool gotNewCommit = false;
-
- uint numberToDelete = 1;
-
- while (numberToDelete < pendingSendArbitrationRounds->size()) {
- ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
-
- if (round->isFull() || round->getDidSendPart()) {
- // Stop since there is a part that cannot be compacted and we
- // need to compact in order
- break;
- }
-
- if (round->getCommit() == NULL) {
- // Try compacting aborts only
- int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
- if (newSize > ArbitrationRound_MAX_PARTS) {
- // Cant compact since it would be too large
- break;
- }
- lastRound->addAborts(round->getAborts());
- } else {
- // Create a new larger commit
- Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
- localArbitrationSequenceNumber++;
-
- // Create the commit parts so that we can count them
- newCommit->createCommitParts();
-
- // Calculate the new size of the parts
- int newSize = newCommit->getNumberOfParts();
- newSize += lastRound->getAbortsCount();
- newSize += round->getAbortsCount();
-
- if (newSize > ArbitrationRound_MAX_PARTS) {
- // Can't compact since it would be too large
- if (lastRound->getCommit() != newCommit &&
- round->getCommit() != newCommit)
- delete newCommit;
- break;
- }
- // Set the new compacted part
- if (lastRound->getCommit() == newCommit)
- lastRound->setCommit(NULL);
- if (round->getCommit() == newCommit)
- round->setCommit(NULL);
-
- if (lastRound->getCommit() != NULL) {
- Commit * oldcommit = lastRound->getCommit();
- lastRound->setCommit(NULL);
- delete oldcommit;
- }
- lastRound->setCommit(newCommit);
- lastRound->addAborts(round->getAborts());
- gotNewCommit = true;
- }
-
- numberToDelete++;
- }
-
- if (numberToDelete != 1) {
- // If there is a compaction
- // Delete the previous pieces that are now in the new compacted piece
- for (uint i = 2; i <= numberToDelete; i++) {
- delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
- }
- pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
-
- pendingSendArbitrationRounds->add(lastRound);
-
- // Should reinsert into the commit processor
- if (hadCommit && gotNewCommit) {
- return true;
- }
- }
-
- return false;
-}
-
-/**
- * Update all the commits and the committed tables, sets dead the dead
- * transactions
- */
-bool Table::updateCommittedTable() {
- if (newCommitParts->size() == 0) {
- // Nothing new to process
- return false;
- }
-
- // Iterate through all the machine Ids that we received new parts for
- SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
- while (partsit->hasNext()) {
- int64_t machineId = partsit->next();
- Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
-
- // Iterate through all the parts for that machine Id
- SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
- while (pairit->hasNext()) {
- Pair<int64_t, int32_t> *partId = pairit->next();
- CommitPart *part = pairit->currVal();
-
- // Get the transaction object for that sequence number
- Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
-
- if (commitForClientTable == NULL) {
- // This is the first commit from this device
- commitForClientTable = new Hashtable<int64_t, Commit *>();
- liveCommitsTable->put(part->getMachineId(), commitForClientTable);
- }
-
- Commit *commit = commitForClientTable->get(part->getSequenceNumber());
-
- if (commit == NULL) {
- // This is a new commit that we dont have so make a new one
- commit = new Commit();
-
- // Insert this new commit into the live tables
- commitForClientTable->put(part->getSequenceNumber(), commit);
- }
-
- // Add that part to the commit
- commit->addPartDecode(part);
- part->releaseRef();
- }
- delete pairit;
- delete parts;
- }
- delete partsit;
-
- // Clear all the new commits parts in preparation for the next time
- // the server sends slots
- newCommitParts->clear();
-
- // If we process a new commit keep track of it for future use
- bool didProcessANewCommit = false;
-
- // Process the commits one by one
- SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
- while (liveit->hasNext()) {
- int64_t arbitratorId = liveit->next();
- // Get all the commits for a specific arbitrator
- Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
-
- // Sort the commits in order
- Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
- {
- SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
- while (clientit->hasNext())
- commitSequenceNumbers->add(clientit->next());
- delete clientit;
- }
-
- qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
-
- // Get the last commit seen from this arbitrator
- int64_t lastCommitSeenSequenceNumber = -1;
- if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
- lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
- }
-
- // Go through each new commit one by one
- for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
- int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
- Commit *commit = commitForClientTable->get(commitSequenceNumber);
- // Special processing if a commit is not complete
- if (!commit->isComplete()) {
- if (i == (commitSequenceNumbers->size() - 1)) {
- // If there is an incomplete commit and this commit is the
- // latest one seen then this commit cannot be processed and
- // there are no other commits
- break;
- } else {
- // This is a commit that was already dead but parts of it
- // are still in the block chain (not flushed out yet)->
- // Delete it and move on
- commit->setDead();
- commitForClientTable->remove(commit->getSequenceNumber());
- delete commit;
- continue;
- }
- }
-
- // Update the last transaction that was updated if we can
- if (commit->getTransactionSequenceNumber() != -1) {
- // Update the last transaction sequence number that the arbitrator arbitrated on1
- if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
- lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
- }
- }
-
- // Update the last arbitration data that we have seen so far
- if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
- int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
- if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
- // Is larger
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
- }
- } else {
- // Never seen any data from this arbitrator so record the first one
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
- }
-
- // We have already seen this commit before so need to do the
- // full processing on this commit
- if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
- // Update the last transaction that was updated if we can
- if (commit->getTransactionSequenceNumber() != -1) {
- int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
- if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
- lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
- lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
- }
- }
- continue;
- }
-
- // If we got here then this is a brand new commit and needs full
- // processing
- // Get what commits should be edited, these are the commits that
- // have live values for their keys
- Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
- {
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
- if (commit != NULL)
- commitsToEdit->add(commit);
- }
- delete kvit;
- }
-
- // Update each previous commit that needs to be updated
- SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
- while (commitit->hasNext()) {
- Commit *previousCommit = commitit->next();
-
- // Only bother with live commits (TODO: Maybe remove this check)
- if (previousCommit->isLive()) {
-
- // Update which keys in the old commits are still live
- {
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- previousCommit->invalidateKey(kv->getKey());
- }
- delete kvit;
- }
-
- // if the commit is now dead then remove it
- if (!previousCommit->isLive()) {
- commitForClientTable->remove(previousCommit->getSequenceNumber());
- delete previousCommit;
- }
- }
- }
- delete commitit;
- delete commitsToEdit;
-
- // Update the last seen sequence number from this arbitrator
- if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
- if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
- lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
- }
- } else {
- lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
- }
-
- // We processed a new commit that we havent seen before
- didProcessANewCommit = true;
-
- // Update the committed table of keys and which commit is using which key
- {
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- committedKeyValueTable->put(kv->getKey(), kv);
- liveCommitsByKeyTable->put(kv->getKey(), commit);
- }
- delete kvit;
- }
- }
- delete commitSequenceNumbers;
- }
- delete liveit;
-
- return didProcessANewCommit;
-}
-
-/**
- * Create the speculative table from transactions that are still live
- * and have come from the cloud
- */
-bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
- if (liveTransactionBySequenceNumberTable->size() == 0) {
- // There is nothing to speculate on
- return false;
- }
-
- // Create a list of the transaction sequence numbers and sort them
- // from oldest to newest
- Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
- {
- SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
- while (trit->hasNext())
- transactionSequenceNumbersSorted->add(trit->next());
- delete trit;
- }
-
- qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
-
- bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
-
-
- if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
- // If there is a gap in the transaction sequence numbers then
- // there was a commit or an abort of a transaction OR there was a
- // new commit (Could be from offline commit) so a redo the
- // speculation from scratch
-
- // Start from scratch
- speculatedKeyValueTable->clear();
- lastTransactionSequenceNumberSpeculatedOn = -1;
- oldestTransactionSequenceNumberSpeculatedOn = -1;
- }
-
- // Remember the front of the transaction list
- oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
-
- // Find where to start arbitration from
- uint startIndex = 0;
-
- for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
- if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
- break;
- startIndex++;
-
- if (startIndex >= transactionSequenceNumbersSorted->size()) {
- // Make sure we are not out of bounds
- delete transactionSequenceNumbersSorted;
- return false; // did not speculate
- }
-
- Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
- bool didSkip = true;
-
- for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
- int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
- Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
-
- if (!transaction->isComplete()) {
- // If there is an incomplete transaction then there is nothing
- // we can do add this transactions arbitrator to the list of
- // arbitrators we should ignore
- incompleteTransactionArbitrator->add(transaction->getArbitrator());
- didSkip = true;
- continue;
- }
-
- if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
- continue;
- }
-
- lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
-
- if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
- // Guard evaluated to true so update the speculative table
- {
- SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- speculatedKeyValueTable->put(kv->getKey(), kv);
- }
- delete kvit;
- }
- }
- }
-
- delete transactionSequenceNumbersSorted;
-
- if (didSkip) {
- // Since there was a skip we need to redo the speculation next time around
- lastTransactionSequenceNumberSpeculatedOn = -1;
- oldestTransactionSequenceNumberSpeculatedOn = -1;
- }
-
- // We did some speculation
- return true;
-}
-
-/**
- * Create the pending transaction speculative table from transactions
- * that are still in the pending transaction buffer
- */
-void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
- if (pendingTransactionQueue->size() == 0) {
- // There is nothing to speculate on
- return;
- }
-
- if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
- // need to reset on the pending speculation
- lastPendingTransactionSpeculatedOn = NULL;
- firstPendingTransaction = pendingTransactionQueue->get(0);
- pendingTransactionSpeculatedKeyValueTable->clear();
- }
-
- // Find where to start arbitration from
- uint startIndex = 0;
-
- for (; startIndex < pendingTransactionQueue->size(); startIndex++)
- if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
- break;
-
- if (startIndex >= pendingTransactionQueue->size()) {
- // Make sure we are not out of bounds
- return;
- }
-
- for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
- Transaction *transaction = pendingTransactionQueue->get(i);
-
- lastPendingTransactionSpeculatedOn = transaction;
-
- if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
- // Guard evaluated to true so update the speculative table
- SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
- }
- delete kvit;
- }
- }
-}
-
-/**
- * Set dead and remove from the live transaction tables the
- * transactions that are dead
- */
-void Table::updateLiveTransactionsAndStatus() {
- // Go through each of the transactions
- {
- SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
- while (iter->hasNext()) {
- int64_t key = iter->next();
- Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
-
- // Check if the transaction is dead
- if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
- && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
- // Set dead the transaction
- transaction->setDead();
-
- // Remove the transaction from the live table
- iter->remove();
- liveTransactionByTransactionIdTable->remove(transaction->getId());
- delete transaction;
- }
- }
- delete iter;
- }
-
- // Go through each of the transactions
- {
- SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
- while (iter->hasNext()) {
- int64_t key = iter->next();
- TransactionStatus *status = outstandingTransactionStatus->get(key);
-
- // Check if the transaction is dead
- if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
- && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
- // Set committed
- status->setStatus(TransactionStatus_StatusCommitted);
-
- // Remove
- iter->remove();
- }
- }
- delete iter;
- }
-}
-
-/**
- * Process this slot, entry by entry-> Also update the latest message sent by slot
- */
-void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
-
- // Update the last message seen
- updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
-
- // Process each entry in the slot
- Vector<Entry *> *entries = slot->getEntries();
- uint eSize = entries->size();
- for (uint ei = 0; ei < eSize; ei++) {
- Entry *entry = entries->get(ei);
- switch (entry->getType()) {
- case TypeCommitPart:
- processEntry((CommitPart *)entry);
- break;
- case TypeAbort:
- processEntry((Abort *)entry);
- break;
- case TypeTransactionPart:
- processEntry((TransactionPart *)entry);
- break;
- case TypeNewKey:
- processEntry((NewKey *)entry);
- break;
- case TypeLastMessage:
- processEntry((LastMessage *)entry, machineSet);
- break;
- case TypeRejectedMessage:
- processEntry((RejectedMessage *)entry, indexer);
- break;
- case TypeTableStatus:
- processEntry((TableStatus *)entry, slot->getSequenceNumber());
- break;
- default:
- throw new Error("Unrecognized type: ");
- }
- }
-}
-
-/**
- * Update the last message that was sent for a machine Id
- */
-void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
- // Update what the last message received by a machine was
- updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
-}
-
-/**
- * Add the new key to the arbitrators table and update the set of live
- * new keys (in case of a rescued new key message)
- */
-void Table::processEntry(NewKey *entry) {
- // Update the arbitrator table with the new key information
- arbitratorTable->put(entry->getKey(), entry->getMachineID());
-
- // Update what the latest live new key is
- NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
- if (oldNewKey != NULL) {
- // Delete the old new key messages
- oldNewKey->setDead();
- }
-}
-
-/**
- * Process new table status entries and set dead the old ones as new
- * ones come in-> keeps track of the largest and smallest table status
- * seen in this current round of updating the local copy of the block
- * chain
- */
-void Table::processEntry(TableStatus *entry, int64_t seq) {
- int newNumSlots = entry->getMaxSlots();
- updateCurrMaxSize(newNumSlots);
- initExpectedSize(seq, newNumSlots);
-
- if (liveTableStatus != NULL) {
- // We have a larger table status so the old table status is no
- // int64_ter alive
- liveTableStatus->setDead();
- }
-
- // Make this new table status the latest alive table status
- liveTableStatus = entry;
-}
-
-/**
- * Check old messages to see if there is a block chain violation->
- * Also
- */
-void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
- int64_t oldSeqNum = entry->getOldSeqNum();
- int64_t newSeqNum = entry->getNewSeqNum();
- bool isequal = entry->getEqual();
- int64_t machineId = entry->getMachineID();
- int64_t seq = entry->getSequenceNumber();
-
- // Check if we have messages that were supposed to be rejected in
- // our local block chain
- for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
- // Get the slot
- Slot *slot = indexer->getSlot(seqNum);
-
- if (slot != NULL) {
- // If we have this slot make sure that it was not supposed to be
- // a rejected slot
- int64_t slotMachineId = slot->getMachineID();
- if (isequal != (slotMachineId == machineId)) {
- throw new Error("Server Error: Trying to insert rejected message for slot ");
- }
- }
- }
-
- // Create a list of clients to watch until they see this rejected
- // message entry->
- Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
- SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
- while (iter->hasNext()) {
- // Machine ID for the last message entry
- int64_t lastMessageEntryMachineId = iter->next();
-
- // We've seen it, don't need to continue to watch-> Our next
- // message will implicitly acknowledge it->
- if (lastMessageEntryMachineId == localMachineId) {
- continue;
- }
-
- Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
- int64_t entrySequenceNumber = lastMessageValue->getFirst();
-
- if (entrySequenceNumber < seq) {
- // Add this rejected message to the set of messages that this
- // machine ID did not see yet
- addWatchVector(lastMessageEntryMachineId, entry);
- // This client did not see this rejected message yet so add it
- // to the watch set to monitor
- deviceWatchSet->add(lastMessageEntryMachineId);
- }
- }
- delete iter;
-
- if (deviceWatchSet->isEmpty()) {
- // This rejected message has been seen by all the clients so
- entry->setDead();
- delete deviceWatchSet;
- } else {
- // We need to watch this rejected message
- entry->setWatchSet(deviceWatchSet);
- }
-}
-
-/**
- * Check if this abort is live, if not then save it so we can kill it
- * later-> update the last transaction number that was arbitrated on->
- */
-void Table::processEntry(Abort *entry) {
- if (entry->getTransactionSequenceNumber() != -1) {
- // update the transaction status if it was sent to the server
- TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
- if (status != NULL) {
- status->setStatus(TransactionStatus_StatusAborted);
- }
- }
-
- // Abort has not been seen by the client it is for yet so we need to
- // keep track of it
-
- Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
- if (previouslySeenAbort != NULL) {
- previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
- }
-
- if (entry->getTransactionArbitrator() == localMachineId) {
- liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
- }
-
- if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
- // The machine already saw this so it is dead
- entry->setDead();
- Pair<int64_t, int64_t> abortid = entry->getAbortId();
- liveAbortTable->remove(&abortid);
-
- if (entry->getTransactionArbitrator() == localMachineId) {
- liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
- }
- return;
- }
-
- // Update the last arbitration data that we have seen so far
- if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
- int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
- if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
- // Is larger
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
- }
- } else {
- // Never seen any data from this arbitrator so record the first one
- lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
- }
-
- // Set dead a transaction if we can
- Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
-
- Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
- if (transactionToSetDead != NULL) {
- liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
- }
-
- // Update the last transaction sequence number that the arbitrator
- // arbitrated on
- if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
- (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
- // Is a valid one
- if (entry->getTransactionSequenceNumber() != -1) {
- lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
- }
- }
-}
-
-/**
- * Set dead the transaction part if that transaction is dead and keep
- * track of all new parts
- */
-void Table::processEntry(TransactionPart *entry) {
- // Check if we have already seen this transaction and set it dead OR
- // if it is not alive
- if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
- // This transaction is dead, it was already committed or aborted
- entry->setDead();
- return;
- }
-
- // This part is still alive
- Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
-
- if (transactionPart == NULL) {
- // Dont have a table for this machine Id yet so make one
- transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
- newTransactionParts->put(entry->getMachineId(), transactionPart);
- }
-
- // Update the part and set dead ones we have already seen (got a
- // rescued version)
- entry->acquireRef();
- TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
- if (previouslySeenPart != NULL) {
- previouslySeenPart->releaseRef();
- previouslySeenPart->setDead();
- }
-}
-
-/**
- * Process new commit entries and save them for future use-> Delete duplicates
- */
-void Table::processEntry(CommitPart *entry) {
- // Update the last transaction that was updated if we can
- if (entry->getTransactionSequenceNumber() != -1) {
- if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
- lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
- lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
- }
- }
-
- Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
- if (commitPart == NULL) {
- // Don't have a table for this machine Id yet so make one
- commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
- newCommitParts->put(entry->getMachineId(), commitPart);
- }
- // Update the part and set dead ones we have already seen (got a
- // rescued version)
- entry->acquireRef();
- CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
- if (previouslySeenPart != NULL) {
- previouslySeenPart->setDead();
- previouslySeenPart->releaseRef();
- }
-}
-
-/**
- * Update the last message seen table-> Update and set dead the
- * appropriate RejectedMessages as clients see them-> Updates the live
- * aborts, removes those that are dead and sets them dead-> Check that
- * the last message seen is correct and that there is no mismatch of
- * our own last message or that other clients have not had a rollback
- * on the last message->
- */
-void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
- // We have seen this machine ID
- machineSet->remove(machineId);
-
- // Get the set of rejected messages that this machine Id is has not seen yet
- Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
- // If there is a rejected message that this machine Id has not seen yet
- if (watchset != NULL) {
- // Go through each rejected message that this machine Id has not
- // seen yet
-
- SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
- while (rmit->hasNext()) {
- RejectedMessage *rm = rmit->next();
- // If this machine Id has seen this rejected message->->->
- if (rm->getSequenceNumber() <= seqNum) {
- // Remove it from our watchlist
- rmit->remove();
- // Decrement machines that need to see this notification
- rm->removeWatcher(machineId);
- }
- }
- delete rmit;
- }
-
- // Set dead the abort
- SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
-
- while (abortit->hasNext()) {
- Pair<int64_t, int64_t> *key = abortit->next();
- Abort *abort = liveAbortTable->get(key);
- if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
- abort->setDead();
- abortit->remove();
- if (abort->getTransactionArbitrator() == localMachineId) {
- liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
- }
- }
- }
- delete abortit;
- if (machineId == localMachineId) {
- // Our own messages are immediately dead->
- char livenessType = liveness->getType();
- if (livenessType == TypeLastMessage) {
- ((LastMessage *)liveness)->setDead();
- } else if (livenessType == TypeSlot) {
- ((Slot *)liveness)->setDead();
- } else {
- throw new Error("Unrecognized type");
- }
- }
- // Get the old last message for this device
- Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
- if (lastMessageEntry == NULL) {
- // If no last message then there is nothing else to process
- return;
- }
-
- int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
- Liveness *lastEntry = lastMessageEntry->getSecond();
- delete lastMessageEntry;
-
- // If it is not our machine Id since we already set ours to dead
- if (machineId != localMachineId) {
- char lastEntryType = lastEntry->getType();
-
- if (lastEntryType == TypeLastMessage) {
- ((LastMessage *)lastEntry)->setDead();
- } else if (lastEntryType == TypeSlot) {
- ((Slot *)lastEntry)->setDead();
- } else {
- throw new Error("Unrecognized type");
- }
- }
- // Make sure the server is not playing any games
- if (machineId == localMachineId) {
- if (hadPartialSendToServer) {
- // We were not making any updates and we had a machine mismatch
- if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
- throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
- }
- } else {
- // We were not making any updates and we had a machine mismatch
- if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
- throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
- }
- }
- } else {
- if (lastMessageSeqNum > seqNum) {
- throw new Error("Server Error: Rollback on remote machine sequence number");
- }
- }
-}
-
-/**
- * Add a rejected message entry to the watch set to keep track of
- * which clients have seen that rejected message entry and which have
- * not.
- */
-void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
- Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
- if (entries == NULL) {
- // There is no set for this machine ID yet so create one
- entries = new Hashset<RejectedMessage *>();
- rejectedMessageWatchVectorTable->put(machineId, entries);
- }
- entries->add(entry);
-}
-
-/**
- * Check if the HMAC chain is not violated
- */
-void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
- for (uint i = 0; i < newSlots->length(); i++) {
- Slot *currSlot = newSlots->get(i);
- Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
- if (prevSlot != NULL &&
- !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
- throw new Error("Server Error: Invalid HMAC Chain");
- }
-}