edits
[iotcloud.git] / version2 / src / C / Table.h
index 4463c8d3952c0c6cb71affe4ca6690636d9a5cb1..394e6d63e3053ede1658005fbb10e5b2a22c7900 100644 (file)
@@ -1,13 +1,15 @@
 #ifndef Table_H
 #define Table_H
-
+#include "common.h"
+#include "Pair.h"
+#include "ThreeTuple.h"
 /**
  * IoTTable data structure.  Provides client interface.
  * @author Brian Demsky
  * @version 1.0
  */
 
-       /* Constants */
+/* Constants */
 #define Table_FREE_SLOTS 2
 // Number of slots that should be kept free // 10
 #define Table_SKIP_THRESHOLD 10
 #define Table_REJECTED_THRESHOLD 5
 
 class Table {
- private:
+private:
        /* Helper Objects */
-       SlotBuffer * buffer;
-       CloudComm * cloud = NULL;
-       Random * random = NULL;
-       TableStatus * liveTableStatus = NULL;
-       PendingTransaction * pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
-       Transaction * lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
-       Transaction * firstPendingTransaction = NULL; // first transaction in the pending transaction list
-       
+       SlotBuffer *buffer;
+       CloudComm *cloud;
+       Random *random;
+       TableStatus *liveTableStatus;
+       PendingTransaction *pendingTransactionBuilder;  // Pending Transaction used in building a Pending Transaction
+       Transaction *lastPendingTransactionSpeculatedOn;        // Last transaction that was speculated on from the pending transaction
+       Transaction *firstPendingTransaction;   // first transaction in the pending transaction list
+
        /* Variables */
-       int numberOfSlots = 0;  // Number of slots stored in buffer
-       int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
-       int64_t liveSlotCount = 0; // Number of currently live slots
-       int64_t oldestLiveSlotSequenceNumver = 0;       // Smallest sequence number of the slot with a live entry
-       int64_t localMachineId = 0; // Machine ID of this client device
-       int64_t sequenceNumber = 0; // Largest sequence number a client has received
-       int64_t localSequenceNumber = 0;
-        
+       int numberOfSlots;      // Number of slots stored in buffer
+       int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
+       int64_t liveSlotCount;// Number of currently live slots
+       int64_t oldestLiveSlotSequenceNumver;   // Smallest sequence number of the slot with a live entry
+       int64_t localMachineId; // Machine ID of this client device
+       int64_t sequenceNumber; // Largest sequence number a client has received
+       int64_t localSequenceNumber;
+
        //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
        //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
-       int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
-       int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
-       int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
-       int64_t localArbitrationSequenceNumber = 0;
-       bool hadPartialSendToServer = false;
-       bool attemptedToSendToServer = false;
+       int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
+       int64_t lastTransactionSequenceNumberSpeculatedOn;      // the last transaction that was speculated on
+       int64_t oldestTransactionSequenceNumberSpeculatedOn;    // the oldest transaction that was speculated on
+       int64_t localArbitrationSequenceNumber;
+       bool hadPartialSendToServer;
+       bool attemptedToSendToServer;
        int64_t expectedsize;
-       bool didFindTableStatus = false;
-       int64_t currMaxSize = 0;
-       
-       Slot * lastSlotAttemptedToSend = NULL;
-       bool lastIsNewKey = false;
-       int lastNewSize = 0;
-       Hashtable<Transaction *, List<int32_t> *> lastTransactionPartsSent = NULL;
-       List<Entry*> *lastPendingSendArbitrationEntriesToDelete = NULL;
-       NewKey * lastNewKey = NULL;
-        
-        
+       bool didFindTableStatus;
+       int64_t currMaxSize;
+
+       Slot *lastSlotAttemptedToSend;
+       bool lastIsNewKey;
+       int lastNewSize;
+       Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
+       Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
+       NewKey *lastNewKey;
+
+
        /* Data Structures  */
-       Hashtable<IoTString *, KeyValue*> *committedKeyValueTable = NULL; // Table of committed key value pairs
-       Hashtable<IoTString *, KeyValue*> * speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
-       Hashtable<IoTString *, KeyValue *> * pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
-       Hashtable<IoTString *, NewKey *> * liveNewKeyTable = NULL; // Table of live new keys
-       Hashtable<int64_t, Pair<int64_t, Liveness*>*> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
-       Hashtable<int64_t, HashSet<RejectedMessage*>*> *rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
-       Hashtable<IoTString*, int64_t> *arbitratorTable = NULL; // Table of keys and their arbitrators
-       Hashtable<Pair<int64_t, int64_t>*, Abort*> *liveAbortTable = NULL; // Table live abort messages
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>*> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*>*> * newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
-       Hashtable<int64_t, Transaction*> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
-       Hashtable<Pair<int64_t, int64_t>*, Transaction*> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
-       Hashtable<int64_t, Hashtable<int64_t, Commit*>> *liveCommitsTable = NULL;
-       Hashtable<IoTString*, Commit*> *liveCommitsByKeyTable = NULL;
-       Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
-       Vector<int64_t> * rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server
-       List<Transaction*> *pendingTransactionQueue = NULL;
-       List<ArbitrationRound*> *pendingSendArbitrationRounds = NULL;
-       List<Entry*> *pendingSendArbitrationEntriesToDelete = NULL;
-       Hashtable<Transaction*, List<Integer*>*> *transactionPartsSent = NULL;
-       Hashtable<int64_t, TransactionStatus*> *outstandingTransactionStatus = NULL;
-       Hashtable<int64_t, Abort*> *liveAbortsGeneratedByLocal = NULL;
-       Hashset<Pair<int64_t, int64_t>*> *offlineTransactionsCommittedAndAtServer = NULL;
-       Hashtable<int64_t, Pair<String*, int32_t>> * localCommunicationTable = NULL;
-       Hashtable<int64_t, int64_t> * lastTransactionSeenFromMachineFromServer = NULL;
-       Hashtable<int64_t, int64_t> * lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+       Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
+       Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable;    // Table of speculated key value pairs, if there is a speculative value
+       Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable;  // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+       Hashtable<IoTString *, NewKey *> *liveNewKeyTable;      // Table of live new keys
+       Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable;      // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+       Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable;      // Table of machine Ids and the set of rejected messages they have not seen yet
+       Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
+       Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts;  // transaction parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts;    // commit parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable;  // Last transaction sequence number that an arbitrator arbitrated on
+       Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable;        // live transaction grouped by the sequence number
+       Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable;    // live transaction grouped by the transaction ID
+       Hashtable<int64_t, Hashtable<int64_t, Commit *> *> *liveCommitsTable;
+       Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+       Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
+       Vector<int64_t> *rejectedSlotVector;    // Vector of rejected slots that have yet to be sent to the server
+       Vector<Transaction *> *pendingTransactionQueue;
+       Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
+       Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
+       Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
+       Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
+       Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
+       Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
+       Hashtable<int64_t, Pair<IoTString *, int32_t> *> *localCommunicationTable;
+       Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
+       Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+       bool lastInsertedNewKey;
+       int64_t lastSeqNumArbOn;
+
+
        void init();
-        /**
-               * Recalculate the new resize threshold
-               */
+       /**
+        * Recalculate the new resize threshold
+        */
        void setResizeThreshold();
        bool sendToServer(NewKey *newKey);
-       synchronized bool updateFromLocal(int64_t machineId);
-       Pair<Boolean, Boolean> sendTransactionToLocal(Transaction *transaction);
-       ThreeTuple<Boolean, Boolean, Array<Slot*> *> * sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
+       bool updateFromLocal(int64_t machineId);
+       Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
+       ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
        /**
         * Returns false if a resize was needed
         */
-       ThreeTuple<Boolean, int32_t*, Boolean> * fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
-       void doRejectedMessages(Slot s);
-       
-       ThreeTuple<Boolean, Boolean, int64_t> doMandatoryResuce(Slot slot, bool resize);
-        
-        void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
-        /**
+       ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+       void doRejectedMessages(Slot *s);
+
+       ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
+
+       void  doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
+       /**
         * Checks for malicious activity and updates the local copy of the block chain.
         */
-        void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
-        
-        void updateLiveStateFromServer();
-        
-        void updateLiveStateFromLocal();
-        
-        void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
-        
-        void updateExpectedSize();
-
-        
-        /**
-               * Check the size of the block chain to make sure there are enough slots sent back by the server.
-               * This is only called when we have a gap between the slots that we have locally and the slots
-               * sent by the server therefore in the slots sent by the server there will be at least 1 Table
-               * status message
-               */
-        void checkNumSlots(int numberOfSlots);
-        
-        void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
-        
-
-        /**
+       void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
+
+       void updateLiveStateFromServer();
+
+       void updateLiveStateFromLocal();
+
+       void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
+
+       void updateExpectedSize();
+
+
+       /**
+        * Check the size of the block chain to make sure there are enough slots sent back by the server.
+        * This is only called when we have a gap between the slots that we have locally and the slots
+        * sent by the server therefore in the slots sent by the server there will be at least 1 Table
+        * status message
+        */
+       void checkNumSlots(int numberOfSlots);
+
+       void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
+
+
+       /**
         * Update the size of of the local buffer if it is needed.
         */
-        void commitNewMaxSize();
-        
-        /**
-               * Process the new transaction parts from this latest round of slots received from the server
-               */
-        void processNewTransactionParts();
-        
-        int64_t lastSeqNumArbOn = 0;
-
-        void arbitrateFromServer();
-        
-        Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction);
-        
-        /**
-               * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
-               */
-        bool compactArbitrationData();
-
-        /**
-               * Update all the commits and the committed tables, sets dead the dead transactions
-               */
-        bool updateCommittedTable();
-        
-        /**
-               * Create the speculative table from transactions that are still live and have come from the cloud
-               */
-        bool updateSpeculativeTable(bool didProcessNewCommits);
-
-        /**
-               * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
-               */
-        void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
-        
-        /**
-               * Set dead and remove from the live transaction tables the transactions that are dead
-               */
-        void updateLiveTransactionsAndStatus();
-        
-        /**
-               * Process this slot, entry by entry.  Also update the latest message sent by slot
-               */
-        void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
-        
-        /**
-               * Update the last message that was sent for a machine Id
-               */
-        void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
-        
-        /**
-               * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
-               */
-        void processEntry(NewKey entry);
-        
-        /**
-               * Process new table status entries and set dead the old ones as new ones come in.
-               * keeps track of the largest and smallest table status seen in this current round
-               * of updating the local copy of the block chain
-               */
-        void processEntry(TableStatus entry, int64_t seq);
-        
-        /**
-               * Check old messages to see if there is a block chain violation. Also
-               */
-        void processEntry(RejectedMessage entry, SlotIndexer indexer);
-        
-        /**
-               * Check if this abort is live, if not then save it so we can kill it later.
-               * update the last transaction number that was arbitrated on.
-               */
-        void processEntry(Abort entry);
-        
-        /**
-               * Set dead the transaction part if that transaction is dead and keep track of all new parts
-               */
-        void processEntry(TransactionPart entry);
-        
-        /**
-               * Process new commit entries and save them for future use.  Delete duplicates
-               */
-        void processEntry(CommitPart entry);
-        
-        /**
-               * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
-               * Updates the live aborts, removes those that are dead and sets them dead.
-               * Check that the last message seen is correct and that there is no mismatch of our own last message or that
-               * other clients have not had a rollback on the last message.
-               */
-        void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
-        
-        /**
-               * Add a rejected message entry to the watch set to keep track of which clients have seen that
-               * rejected message entry and which have not.
-               */
-        void addWatchList(int64_t machineId, RejectedMessage entry);
-
-        /**
-               * Check if the HMAC chain is not violated
-               */
-        void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
-        bool lastInsertedNewKey = false;
-        
- public:
-        Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
-        Table(CloudComm _cloud, int64_t _localMachineId);
-        
-        /**
-               * Initialize the table by inserting a table status as the first entry into the table status
-               * also initialize the crypto stuff.
-               */
-        void initTable();
-        
-        /**
-               * Rebuild the table from scratch by pulling the latest block chain from the server.
-               */
-        void rebuild();
-        void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
-        uint64_t getArbitrator(IoTString * key);
-        void close();
-        IoTString * getCommitted(IoTString * key);
-        IoTString * getSpeculative(IoTString * key);
-        IoTString * getCommittedAtomic(IoTString * key);
-        bool createNewKey(IoTString * keyName, int64_t machineId);
-        TransactionStatus * commitTransaction();
-        
+       void commitNewMaxSize();
+
+       /**
+        * Process the new transaction parts from this latest round of slots received from the server
+        */
+       void processNewTransactionParts();
+
+
+
+       void arbitrateFromServer();
+
+       Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
+
+       /**
+        * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
+        */
+       bool compactArbitrationData();
+
+       /**
+        * Update all the commits and the committed tables, sets dead the dead transactions
+        */
+       bool updateCommittedTable();
+
+       /**
+        * Create the speculative table from transactions that are still live and have come from the cloud
+        */
+       bool updateSpeculativeTable(bool didProcessNewCommits);
+
+       /**
+        * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
+        */
+       void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
+
+       /**
+        * Set dead and remove from the live transaction tables the transactions that are dead
+        */
+       void updateLiveTransactionsAndStatus();
+
+       /**
+        * Process this slot, entry by entry.  Also update the latest message sent by slot
+        */
+       void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
+
+       /**
+        * Update the last message that was sent for a machine Id
+        */
+       void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
+
+       /**
+        * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
+        */
+       void processEntry(NewKey *entry);
+
+       /**
+        * Process new table status entries and set dead the old ones as new ones come in.
+        * keeps track of the largest and smallest table status seen in this current round
+        * of updating the local copy of the block chain
+        */
+       void processEntry(TableStatus *entry, int64_t seq);
+
+       /**
+        * Check old messages to see if there is a block chain violation. Also
+        */
+       void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
+
+       /**
+        * Check if this abort is live, if not then save it so we can kill it later.
+        * update the last transaction number that was arbitrated on.
+        */
+       void processEntry(Abort *entry);
+
+       /**
+        * Set dead the transaction part if that transaction is dead and keep track of all new parts
+        */
+       void processEntry(TransactionPart *entry);
+
+       /**
+        * Process new commit entries and save them for future use.  Delete duplicates
+        */
+       void processEntry(CommitPart *entry);
+
+       /**
+        * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
+        * Updates the live aborts, removes those that are dead and sets them dead.
+        * Check that the last message seen is correct and that there is no mismatch of our own last message or that
+        * other clients have not had a rollback on the last message.
+        */
+       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
+
+       /**
+        * Add a rejected message entry to the watch set to keep track of which clients have seen that
+        * rejected message entry and which have not.
+        */
+       void addWatchVector(int64_t machineId, RejectedMessage *entry);
+
+       /**
+        * Check if the HMAC chain is not violated
+        */
+       void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
+
+
+public:
+       Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
+       Table(CloudComm *_cloud, int64_t _localMachineId);
+
+       /**
+        * Initialize the table by inserting a table status as the first entry into the table status
+        * also initialize the crypto stuff.
+        */
+       void initTable();
+
+       /**
+        * Rebuild the table from scratch by pulling the latest block chain from the server.
+        */
+
+       void rebuild();
+       void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
+       int64_t getArbitrator(IoTString *key);
+       void close();
+       IoTString *getCommitted(IoTString *key);
+       IoTString *getSpeculative(IoTString *key);
+       IoTString *getCommittedAtomic(IoTString *key);
+       IoTString *getSpeculativeAtomic(IoTString *key);
+       bool update();
+       bool createNewKey(IoTString *keyName, int64_t machineId);
+       void startTransaction();
+       void addKV(IoTString *key, IoTString *value);
+       TransactionStatus *commitTransaction();
+
        /**
         * Get the machine ID for this client
         */
-        int64_t getMachineId() { return localMachineId; }
-        
-        /**
-               * Decrement the number of live slots that we currently have
-               */
-        void decrementLiveCount() { liveSlotCount--; }
-        int64_t getLocalSequenceNumber();
-        Array<char> * acceptDataFromLocal(Array<char> * data);
+       int64_t getMachineId() { return localMachineId; }
+
+       /**
+        * Decrement the number of live slots that we currently have
+        */
+       void decrementLiveCount() { liveSlotCount--; }
+       int64_t getLocalSequenceNumber();
+       Array<char> *acceptDataFromLocal(Array<char> *data);
 };
 
 #endif