Bug fixes + tabbing
[iotcloud.git] / version2 / src / C / Table.h
index 763ea90f7e24bae5d156fa5639bf29e8aa637ef4..a97d12490e0da627cb244d32d537ca9d07ce3bd5 100644 (file)
@@ -3,6 +3,8 @@
 #include "common.h"
 #include "Pair.h"
 #include "ThreeTuple.h"
+#include "IoTString.h"
+
 /**
  * IoTTable data structure.  Provides client interface.
  * @author Brian Demsky
@@ -22,7 +24,7 @@ private:
        /* Helper Objects */
        SlotBuffer *buffer;
        CloudComm *cloud;
-       Random *random;
+       SecureRandom *random;
        TableStatus *liveTableStatus;
        PendingTransaction *pendingTransactionBuilder;  // Pending Transaction used in building a Pending Transaction
        Transaction *lastPendingTransactionSpeculatedOn;        // Last transaction that was speculated on from the pending transaction
@@ -52,7 +54,7 @@ private:
        Slot *lastSlotAttemptedToSend;
        bool lastIsNewKey;
        int lastNewSize;
-       Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent;
+       Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
        Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
        NewKey *lastNewKey;
 
@@ -64,25 +66,25 @@ private:
        Hashtable<IoTString *, NewKey *> *liveNewKeyTable;      // Table of live new keys
        Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable;      // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
        Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable;      // Table of machine Ids and the set of rejected messages they have not seen yet
-       Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
-       Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts;      // transaction parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts;        // commit parts that are seen in this latest round of slots from the server
+       Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals> *arbitratorTable;// Table of keys and their arbitrators
+       Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts;  // transaction parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts;    // commit parts that are seen in this latest round of slots from the server
        Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable;  // Last transaction sequence number that an arbitrator arbitrated on
        Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable;        // live transaction grouped by the sequence number
-       Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable;        // live transaction grouped by the transaction ID
-       Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
+       Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable;    // live transaction grouped by the transaction ID
+       Hashtable<int64_t, Hashtable<int64_t, Commit *> *> *liveCommitsTable;
        Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
        Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
        Vector<int64_t> *rejectedSlotVector;    // Vector of rejected slots that have yet to be sent to the server
        Vector<Transaction *> *pendingTransactionQueue;
        Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
        Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
-       Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent;
+       Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
        Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
        Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
-       Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
-       Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+       Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
+       Hashtable<int64_t, Pair<IoTString *, int32_t> *> *localCommunicationTable;
        Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
        Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
        bool lastInsertedNewKey;
@@ -97,20 +99,20 @@ private:
        bool sendToServer(NewKey *newKey);
        bool updateFromLocal(int64_t machineId);
        Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
-       ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
+       ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
        /**
         * Returns false if a resize was needed
         */
-       ThreeTuple<bool, int32_t *, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
-       void doRejectedMessages(Slot s);
+       ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+       void doRejectedMessages(Slot *s);
 
-       ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot slot, bool resize);
+       ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
 
-       void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
+       void  doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
        /**
         * Checks for malicious activity and updates the local copy of the block chain.
         */
-       void validateAndUpdate(Array<Slot> *newSlots, bool acceptUpdatesToLocal);
+       void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
 
        void updateLiveStateFromServer();
 
@@ -146,7 +148,7 @@ private:
 
        void arbitrateFromServer();
 
-       Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction);
+       Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
 
        /**
         * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
@@ -176,45 +178,45 @@ private:
        /**
         * Process this slot, entry by entry.  Also update the latest message sent by slot
         */
-       void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
+       void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
 
        /**
         * Update the last message that was sent for a machine Id
         */
-       void processEntry(LastMessage entry, Hashset<int64_t> machineSet);
+       void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
 
        /**
         * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
         */
-       void processEntry(NewKey entry);
+       void processEntry(NewKey *entry);
 
        /**
         * Process new table status entries and set dead the old ones as new ones come in.
         * keeps track of the largest and smallest table status seen in this current round
         * of updating the local copy of the block chain
         */
-       void processEntry(TableStatus entry, int64_t seq);
+       void processEntry(TableStatus *entry, int64_t seq);
 
        /**
         * Check old messages to see if there is a block chain violation. Also
         */
-       void processEntry(RejectedMessage entry, SlotIndexer indexer);
+       void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
 
        /**
         * Check if this abort is live, if not then save it so we can kill it later.
         * update the last transaction number that was arbitrated on.
         */
-       void processEntry(Abort entry);
+       void processEntry(Abort *entry);
 
        /**
         * Set dead the transaction part if that transaction is dead and keep track of all new parts
         */
-       void processEntry(TransactionPart entry);
+       void processEntry(TransactionPart *entry);
 
        /**
         * Process new commit entries and save them for future use.  Delete duplicates
         */
-       void processEntry(CommitPart entry);
+       void processEntry(CommitPart *entry);
 
        /**
         * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
@@ -222,23 +224,24 @@ private:
         * Check that the last message seen is correct and that there is no mismatch of our own last message or that
         * other clients have not had a rollback on the last message.
         */
-       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
+       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
 
        /**
         * Add a rejected message entry to the watch set to keep track of which clients have seen that
         * rejected message entry and which have not.
         */
-       void addWatchVector(int64_t machineId, RejectedMessage entry);
+       void addWatchVector(int64_t machineId, RejectedMessage *entry);
 
        /**
         * Check if the HMAC chain is not violated
         */
-       void checkHMACChain(SlotIndexer indexer, Array<Slot> *newSlots);
+       void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
 
 
 public:
        Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
-       Table(CloudComm _cloud, int64_t _localMachineId);
+       Table(CloudComm *_cloud, int64_t _localMachineId);
+       ~Table();
 
        /**
         * Initialize the table by inserting a table status as the first entry into the table status
@@ -249,14 +252,19 @@ public:
        /**
         * Rebuild the table from scratch by pulling the latest block chain from the server.
         */
+
        void rebuild();
        void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
-       uint64_t getArbitrator(IoTString *key);
+       int64_t getArbitrator(IoTString *key);
        void close();
        IoTString *getCommitted(IoTString *key);
        IoTString *getSpeculative(IoTString *key);
        IoTString *getCommittedAtomic(IoTString *key);
+       IoTString *getSpeculativeAtomic(IoTString *key);
+       bool update();
        bool createNewKey(IoTString *keyName, int64_t machineId);
+       void startTransaction();
+       void addKV(IoTString *key, IoTString *value);
        TransactionStatus *commitTransaction();
 
        /**