#ifndef Table_H
#define Table_H
-
+#include "common.h"
+#include "Pair.h"
+#include "ThreeTuple.h"
/**
* IoTTable data structure. Provides client interface.
* @author Brian Demsky
* @version 1.0
*/
- /* Constants */
+/* Constants */
#define Table_FREE_SLOTS 2
// Number of slots that should be kept free // 10
#define Table_SKIP_THRESHOLD 10
#define Table_REJECTED_THRESHOLD 5
class Table {
- private:
+private:
/* Helper Objects */
- SlotBuffer * buffer;
- CloudComm * cloud = NULL;
- Random * random = NULL;
- TableStatus * liveTableStatus = NULL;
- PendingTransaction * pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction * lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction * firstPendingTransaction = NULL; // first transaction in the pending transaction list
-
+ SlotBuffer *buffer;
+ CloudComm *cloud;
+ Random *random;
+ TableStatus *liveTableStatus;
+ PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
+ Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
+ Transaction *firstPendingTransaction; // first transaction in the pending transaction list
+
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0; // Number of currently live slots
- int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
- int64_t localSequenceNumber = 0;
-
+ int numberOfSlots; // Number of slots stored in buffer
+ int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount;// Number of currently live slots
+ int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry
+ int64_t localMachineId; // Machine ID of this client device
+ int64_t sequenceNumber; // Largest sequence number a client has received
+ int64_t localSequenceNumber;
+
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
- int64_t localArbitrationSequenceNumber = 0;
- bool hadPartialSendToServer = false;
- bool attemptedToSendToServer = false;
+ int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on
+ int64_t localArbitrationSequenceNumber;
+ bool hadPartialSendToServer;
+ bool attemptedToSendToServer;
int64_t expectedsize;
- bool didFindTableStatus = false;
- int64_t currMaxSize = 0;
-
- Slot * lastSlotAttemptedToSend = NULL;
- bool lastIsNewKey = false;
- int lastNewSize = 0;
- Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
- Vector<Entry*> *lastPendingSendArbitrationEntriesToDelete = NULL;
- NewKey * lastNewKey = NULL;
-
-
+ bool didFindTableStatus;
+ int64_t currMaxSize;
+
+ Slot *lastSlotAttemptedToSend;
+ bool lastIsNewKey;
+ int lastNewSize;
+ Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
+ Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
+ NewKey *lastNewKey;
+
+
/* Data Structures */
- Hashtable<IoTString *, KeyValue*> *committedKeyValueTable = NULL; // Table of committed key value pairs
- Hashtable<IoTString *, KeyValue*> * speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString *, KeyValue *> * pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString *, NewKey *> * liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<int64_t, Pair<int64_t, Liveness*>*> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<int64_t, HashSet<RejectedMessage*>*> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString*, int64_t> *arbitratorTable = NULL; // Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t>*, Abort*> *liveAbortTable = NULL; // Table live abort messages
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>*> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*>*> * newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<int64_t, Transaction*> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t>*, Transaction*> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<int64_t, Hashtable<int64_t, Commit*>> *liveCommitsTable = NULL;
- Hashtable<IoTString*, Commit*> *liveCommitsByKeyTable = NULL;
- Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<int64_t> * rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
- Vector<Transaction*> *pendingTransactionQueue = NULL;
- Vector<ArbitrationRound*> *pendingSendArbitrationRounds = NULL;
- Vector<Entry*> *pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction*, Vector<int32_t*>*> *transactionPartsSent = NULL;
- Hashtable<int64_t, TransactionStatus*> *outstandingTransactionStatus = NULL;
- Hashtable<int64_t, Abort*> *liveAbortsGeneratedByLocal = NULL;
- Hashset<Pair<int64_t, int64_t>*> *offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<int64_t, Pair<String*, int32_t>> * localCommunicationTable = NULL;
- Hashtable<int64_t, int64_t> * lastTransactionSeenFromMachineFromServer = NULL;
- Hashtable<int64_t, int64_t> * lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+ Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
+ Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString *, NewKey *> *liveNewKeyTable; // Table of live new keys
+ Hashtable<int64_t, Pair<int64_t, Liveness *> > *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
+ Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
+ Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+ Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
+ Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Transaction *> *pendingTransactionQueue;
+ Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
+ Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
+ Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
+ Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
+ Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
+ Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
+ Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+ Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
+ Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+ bool lastInsertedNewKey;
+ int64_t lastSeqNumArbOn;
+
+
void init();
- /**
- * Recalculate the new resize threshold
- */
+ /**
+ * Recalculate the new resize threshold
+ */
void setResizeThreshold();
bool sendToServer(NewKey *newKey);
- synchronized bool updateFromLocal(int64_t machineId);
- Pair<Boolean, Boolean> sendTransactionToLocal(Transaction *transaction);
- ThreeTuple<Boolean, Boolean, Array<Slot*> *> * sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
+ bool updateFromLocal(int64_t machineId);
+ Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
+ ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
/**
* Returns false if a resize was needed
*/
- ThreeTuple<Boolean, int32_t*, Boolean> * fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
- void doRejectedMessages(Slot s);
-
- ThreeTuple<Boolean, Boolean, int64_t> doMandatoryResuce(Slot slot, bool resize);
-
- void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
- /**
+ ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+ void doRejectedMessages(Slot *s);
+
+ ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
+
+ void doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
+ /**
* Checks for malicious activity and updates the local copy of the block chain.
*/
- void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
-
- void updateLiveStateFromServer();
-
- void updateLiveStateFromLocal();
-
- void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
-
- void updateExpectedSize();
-
-
- /**
- * Check the size of the block chain to make sure there are enough slots sent back by the server.
- * This is only called when we have a gap between the slots that we have locally and the slots
- * sent by the server therefore in the slots sent by the server there will be at least 1 Table
- * status message
- */
- void checkNumSlots(int numberOfSlots);
-
- void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
-
-
- /**
+ void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
+
+ void updateLiveStateFromServer();
+
+ void updateLiveStateFromLocal();
+
+ void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
+
+ void updateExpectedSize();
+
+
+ /**
+ * Check the size of the block chain to make sure there are enough slots sent back by the server.
+ * This is only called when we have a gap between the slots that we have locally and the slots
+ * sent by the server therefore in the slots sent by the server there will be at least 1 Table
+ * status message
+ */
+ void checkNumSlots(int numberOfSlots);
+
+ void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
+
+
+ /**
* Update the size of of the local buffer if it is needed.
*/
- void commitNewMaxSize();
-
- /**
- * Process the new transaction parts from this latest round of slots received from the server
- */
- void processNewTransactionParts();
-
- int64_t lastSeqNumArbOn = 0;
-
- void arbitrateFromServer();
-
- Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction);
-
- /**
- * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
- */
- bool compactArbitrationData();
-
- /**
- * Update all the commits and the committed tables, sets dead the dead transactions
- */
- bool updateCommittedTable();
-
- /**
- * Create the speculative table from transactions that are still live and have come from the cloud
- */
- bool updateSpeculativeTable(bool didProcessNewCommits);
-
- /**
- * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
- */
- void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
-
- /**
- * Set dead and remove from the live transaction tables the transactions that are dead
- */
- void updateLiveTransactionsAndStatus();
-
- /**
- * Process this slot, entry by entry. Also update the latest message sent by slot
- */
- void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
-
- /**
- * Update the last message that was sent for a machine Id
- */
- void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
-
- /**
- * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
- */
- void processEntry(NewKey entry);
-
- /**
- * Process new table status entries and set dead the old ones as new ones come in.
- * keeps track of the largest and smallest table status seen in this current round
- * of updating the local copy of the block chain
- */
- void processEntry(TableStatus entry, int64_t seq);
-
- /**
- * Check old messages to see if there is a block chain violation. Also
- */
- void processEntry(RejectedMessage entry, SlotIndexer indexer);
-
- /**
- * Check if this abort is live, if not then save it so we can kill it later.
- * update the last transaction number that was arbitrated on.
- */
- void processEntry(Abort entry);
-
- /**
- * Set dead the transaction part if that transaction is dead and keep track of all new parts
- */
- void processEntry(TransactionPart entry);
-
- /**
- * Process new commit entries and save them for future use. Delete duplicates
- */
- void processEntry(CommitPart entry);
-
- /**
- * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
- * Updates the live aborts, removes those that are dead and sets them dead.
- * Check that the last message seen is correct and that there is no mismatch of our own last message or that
- * other clients have not had a rollback on the last message.
- */
- void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
-
- /**
- * Add a rejected message entry to the watch set to keep track of which clients have seen that
- * rejected message entry and which have not.
- */
- void addWatchVector(int64_t machineId, RejectedMessage entry);
-
- /**
- * Check if the HMAC chain is not violated
- */
- void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
- bool lastInsertedNewKey = false;
-
- public:
- Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
- Table(CloudComm _cloud, int64_t _localMachineId);
-
- /**
- * Initialize the table by inserting a table status as the first entry into the table status
- * also initialize the crypto stuff.
- */
- void initTable();
-
- /**
- * Rebuild the table from scratch by pulling the latest block chain from the server.
- */
- void rebuild();
- void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
- uint64_t getArbitrator(IoTString * key);
- void close();
- IoTString * getCommitted(IoTString * key);
- IoTString * getSpeculative(IoTString * key);
- IoTString * getCommittedAtomic(IoTString * key);
- bool createNewKey(IoTString * keyName, int64_t machineId);
- TransactionStatus * commitTransaction();
-
+ void commitNewMaxSize();
+
+ /**
+ * Process the new transaction parts from this latest round of slots received from the server
+ */
+ void processNewTransactionParts();
+
+
+
+ void arbitrateFromServer();
+
+ Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
+
+ /**
+ * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
+ */
+ bool compactArbitrationData();
+
+ /**
+ * Update all the commits and the committed tables, sets dead the dead transactions
+ */
+ bool updateCommittedTable();
+
+ /**
+ * Create the speculative table from transactions that are still live and have come from the cloud
+ */
+ bool updateSpeculativeTable(bool didProcessNewCommits);
+
+ /**
+ * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
+ */
+ void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
+
+ /**
+ * Set dead and remove from the live transaction tables the transactions that are dead
+ */
+ void updateLiveTransactionsAndStatus();
+
+ /**
+ * Process this slot, entry by entry. Also update the latest message sent by slot
+ */
+ void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
+
+ /**
+ * Update the last message that was sent for a machine Id
+ */
+ void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
+
+ /**
+ * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
+ */
+ void processEntry(NewKey *entry);
+
+ /**
+ * Process new table status entries and set dead the old ones as new ones come in.
+ * keeps track of the largest and smallest table status seen in this current round
+ * of updating the local copy of the block chain
+ */
+ void processEntry(TableStatus *entry, int64_t seq);
+
+ /**
+ * Check old messages to see if there is a block chain violation. Also
+ */
+ void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
+
+ /**
+ * Check if this abort is live, if not then save it so we can kill it later.
+ * update the last transaction number that was arbitrated on.
+ */
+ void processEntry(Abort *entry);
+
+ /**
+ * Set dead the transaction part if that transaction is dead and keep track of all new parts
+ */
+ void processEntry(TransactionPart *entry);
+
+ /**
+ * Process new commit entries and save them for future use. Delete duplicates
+ */
+ void processEntry(CommitPart *entry);
+
+ /**
+ * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
+ * Updates the live aborts, removes those that are dead and sets them dead.
+ * Check that the last message seen is correct and that there is no mismatch of our own last message or that
+ * other clients have not had a rollback on the last message.
+ */
+ void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
+
+ /**
+ * Add a rejected message entry to the watch set to keep track of which clients have seen that
+ * rejected message entry and which have not.
+ */
+ void addWatchVector(int64_t machineId, RejectedMessage *entry);
+
+ /**
+ * Check if the HMAC chain is not violated
+ */
+ void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
+
+
+public:
+ Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
+ Table(CloudComm *_cloud, int64_t _localMachineId);
+
+ /**
+ * Initialize the table by inserting a table status as the first entry into the table status
+ * also initialize the crypto stuff.
+ */
+ void initTable();
+
+ /**
+ * Rebuild the table from scratch by pulling the latest block chain from the server.
+ */
+
+ void rebuild();
+ void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
+ int64_t getArbitrator(IoTString *key);
+ void close();
+ IoTString *getCommitted(IoTString *key);
+ IoTString *getSpeculative(IoTString *key);
+ IoTString *getCommittedAtomic(IoTString *key);
+ IoTString *getSpeculativeAtomic(IoTString *key);
+ bool update();
+ bool createNewKey(IoTString *keyName, int64_t machineId);
+ void startTransaction();
+ void addKV(IoTString *key, IoTString *value);
+ TransactionStatus *commitTransaction();
+
/**
* Get the machine ID for this client
*/
- int64_t getMachineId() { return localMachineId; }
-
- /**
- * Decrement the number of live slots that we currently have
- */
- void decrementLiveCount() { liveSlotCount--; }
- int64_t getLocalSequenceNumber();
- Array<char> * acceptDataFromLocal(Array<char> * data);
+ int64_t getMachineId() { return localMachineId; }
+
+ /**
+ * Decrement the number of live slots that we currently have
+ */
+ void decrementLiveCount() { liveSlotCount--; }
+ int64_t getLocalSequenceNumber();
+ Array<char> *acceptDataFromLocal(Array<char> *data);
};
#endif