Another file compiles
[iotcloud.git] / version2 / src / C / Table.h
index 3bc9cb92592261aae82a2d979e440db87aef5915..7cc98abc2ff24dae7f6be3a777b36c2ed0a8d591 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef Table_H
 #define Table_H
-
+#include "common.h"
+#include "Pair.h"
+#include "ThreeTuple.h"
 /**
  * IoTTable data structure.  Provides client interface.
  * @author Brian Demsky
@@ -19,92 +21,96 @@ class Table {
 private:
        /* Helper Objects */
        SlotBuffer *buffer;
-       CloudComm *cloud = NULL;
-       Random *random = NULL;
-       TableStatus *liveTableStatus = NULL;
-       PendingTransaction *pendingTransactionBuilder = NULL;   // Pending Transaction used in building a Pending Transaction
-       Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
-       Transaction *firstPendingTransaction = NULL;    // first transaction in the pending transaction list
+       CloudComm *cloud;
+       Random *random;
+       TableStatus *liveTableStatus;
+       PendingTransaction *pendingTransactionBuilder;  // Pending Transaction used in building a Pending Transaction
+       Transaction *lastPendingTransactionSpeculatedOn;        // Last transaction that was speculated on from the pending transaction
+       Transaction *firstPendingTransaction;   // first transaction in the pending transaction list
 
        /* Variables */
-       int numberOfSlots = 0;  // Number of slots stored in buffer
-       int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
-       int64_t liveSlotCount = 0;// Number of currently live slots
-       int64_t oldestLiveSlotSequenceNumver = 0;       // Smallest sequence number of the slot with a live entry
-       int64_t localMachineId = 0;     // Machine ID of this client device
-       int64_t sequenceNumber = 0;     // Largest sequence number a client has received
-       int64_t localSequenceNumber = 0;
+       int numberOfSlots;      // Number of slots stored in buffer
+       int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
+       int64_t liveSlotCount;// Number of currently live slots
+       int64_t oldestLiveSlotSequenceNumver;   // Smallest sequence number of the slot with a live entry
+       int64_t localMachineId; // Machine ID of this client device
+       int64_t sequenceNumber; // Largest sequence number a client has received
+       int64_t localSequenceNumber;
 
        //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
        //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
-       int64_t localTransactionSequenceNumber = 0;     // Local sequence number counter for transactions
-       int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
-       int64_t oldestTransactionSequenceNumberSpeculatedOn = -1;       // the oldest transaction that was speculated on
-       int64_t localArbitrationSequenceNumber = 0;
-       bool hadPartialSendToServer = false;
-       bool attemptedToSendToServer = false;
+       int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
+       int64_t lastTransactionSequenceNumberSpeculatedOn;      // the last transaction that was speculated on
+       int64_t oldestTransactionSequenceNumberSpeculatedOn;    // the oldest transaction that was speculated on
+       int64_t localArbitrationSequenceNumber;
+       bool hadPartialSendToServer;
+       bool attemptedToSendToServer;
        int64_t expectedsize;
-       bool didFindTableStatus = false;
-       int64_t currMaxSize = 0;
+       bool didFindTableStatus;
+       int64_t currMaxSize;
 
-       Slot *lastSlotAttemptedToSend = NULL;
-       bool lastIsNewKey = false;
-       int lastNewSize = 0;
-       Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
-       Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete = NULL;
-       NewKey *lastNewKey = NULL;
+       Slot *lastSlotAttemptedToSend;
+       bool lastIsNewKey;
+       int lastNewSize;
+       Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
+       Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
+       NewKey *lastNewKey;
 
 
        /* Data Structures  */
-       Hashtable<IoTString *, KeyValue *> *committedKeyValueTable = NULL;// Table of committed key value pairs
-       Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable = NULL;     // Table of speculated key value pairs, if there is a speculative value
-       Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable = NULL;   // Table of speculated key value pairs, if there is a speculative value from the pending transactions
-       Hashtable<IoTString *, NewKey *> *liveNewKeyTable = NULL;       // Table of live new keys
-       Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable = NULL;       // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
-       Hashtable<int64_t, HashSet<RejectedMessage *> *> *rejectedMessageWatchVectorTable = NULL;       // Table of machine Ids and the set of rejected messages they have not seen yet
-       Hashtable<IoTString *, int64_t> *arbitratorTable = NULL;// Table of keys and their arbitrators
-       Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable = NULL;// Table live abort messages
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts = NULL;       // transaction parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
-       Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL;   // Last transaction sequence number that an arbitrator arbitrated on
-       Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
-       Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
-       Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable = NULL;
-       Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable = NULL;
-       Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
-       Vector<int64_t> *rejectedSlotVector = NULL;     // Vector of rejected slots that have yet to be sent to the server
-       Vector<Transaction *> *pendingTransactionQueue = NULL;
-       Vector<ArbitrationRound *> *pendingSendArbitrationRounds = NULL;
-       Vector<Entry *> *pendingSendArbitrationEntriesToDelete = NULL;
-       Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent = NULL;
-       Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus = NULL;
-       Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal = NULL;
-       Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer = NULL;
-       Hashtable<int64_t, Pair<String *, int32_t> > *localCommunicationTable = NULL;
-       Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer = NULL;
-       Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+       Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
+       Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable;    // Table of speculated key value pairs, if there is a speculative value
+       Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable;  // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+       Hashtable<IoTString *, NewKey *> *liveNewKeyTable;      // Table of live new keys
+       Hashtable<int64_t, Pair<int64_t, Liveness *> > *lastMessageTable;       // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+       Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable;      // Table of machine Ids and the set of rejected messages they have not seen yet
+       Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
+       Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts;    // transaction parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts;      // commit parts that are seen in this latest round of slots from the server
+       Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable;  // Last transaction sequence number that an arbitrator arbitrated on
+       Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable;        // live transaction grouped by the sequence number
+       Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable;      // live transaction grouped by the transaction ID
+       Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
+       Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+       Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
+       Vector<int64_t> *rejectedSlotVector;    // Vector of rejected slots that have yet to be sent to the server
+       Vector<Transaction *> *pendingTransactionQueue;
+       Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
+       Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
+       Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
+       Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
+       Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
+       Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
+       Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+       Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
+       Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+       bool lastInsertedNewKey;
+       int64_t lastSeqNumArbOn;
+
+
        void init();
        /**
         * Recalculate the new resize threshold
         */
        void setResizeThreshold();
        bool sendToServer(NewKey *newKey);
-       synchronized bool updateFromLocal(int64_t machineId);
+       bool updateFromLocal(int64_t machineId);
        Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
-       ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
+       ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
        /**
         * Returns false if a resize was needed
         */
-       ThreeTuple<bool, int32_t *, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
-       void doRejectedMessages(Slot s);
+       ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+       void doRejectedMessages(Slot *s);
 
-       ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot slot, bool resize);
+       ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
 
-       void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
+       void  doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
        /**
         * Checks for malicious activity and updates the local copy of the block chain.
         */
-       void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
+       void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
 
        void updateLiveStateFromServer();
 
@@ -136,11 +142,11 @@ private:
         */
        void processNewTransactionParts();
 
-       int64_t lastSeqNumArbOn = 0;
+
 
        void arbitrateFromServer();
 
-       Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction);
+       Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
 
        /**
         * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
@@ -170,45 +176,45 @@ private:
        /**
         * Process this slot, entry by entry.  Also update the latest message sent by slot
         */
-       void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+       void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
 
        /**
         * Update the last message that was sent for a machine Id
         */
-       void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
+       void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
 
        /**
         * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
         */
-       void processEntry(NewKey entry);
+       void processEntry(NewKey *entry);
 
        /**
         * Process new table status entries and set dead the old ones as new ones come in.
         * keeps track of the largest and smallest table status seen in this current round
         * of updating the local copy of the block chain
         */
-       void processEntry(TableStatus entry, int64_t seq);
+       void processEntry(TableStatus *entry, int64_t seq);
 
        /**
         * Check old messages to see if there is a block chain violation. Also
         */
-       void processEntry(RejectedMessage entry, SlotIndexer indexer);
+       void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
 
        /**
         * Check if this abort is live, if not then save it so we can kill it later.
         * update the last transaction number that was arbitrated on.
         */
-       void processEntry(Abort entry);
+       void processEntry(Abort *entry);
 
        /**
         * Set dead the transaction part if that transaction is dead and keep track of all new parts
         */
-       void processEntry(TransactionPart entry);
+       void processEntry(TransactionPart *entry);
 
        /**
         * Process new commit entries and save them for future use.  Delete duplicates
         */
-       void processEntry(CommitPart entry);
+       void processEntry(CommitPart *entry);
 
        /**
         * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
@@ -216,23 +222,23 @@ private:
         * Check that the last message seen is correct and that there is no mismatch of our own last message or that
         * other clients have not had a rollback on the last message.
         */
-       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+       void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
 
        /**
         * Add a rejected message entry to the watch set to keep track of which clients have seen that
         * rejected message entry and which have not.
         */
-       void addWatchVector(int64_t machineId, RejectedMessage entry);
+       void addWatchVector(int64_t machineId, RejectedMessage *entry);
 
        /**
         * Check if the HMAC chain is not violated
         */
-       void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
-       bool lastInsertedNewKey = false;
+       void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
+
 
 public:
-       Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
-       Table(CloudComm _cloud, int64_t _localMachineId);
+       Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
+       Table(CloudComm *_cloud, int64_t _localMachineId);
 
        /**
         * Initialize the table by inserting a table status as the first entry into the table status
@@ -243,14 +249,19 @@ public:
        /**
         * Rebuild the table from scratch by pulling the latest block chain from the server.
         */
+
        void rebuild();
-       void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
-       uint64_t getArbitrator(IoTString *key);
+       void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
+       int64_t getArbitrator(IoTString *key);
        void close();
        IoTString *getCommitted(IoTString *key);
        IoTString *getSpeculative(IoTString *key);
        IoTString *getCommittedAtomic(IoTString *key);
+       IoTString *getSpeculativeAtomic(IoTString *key);
+       bool update();
        bool createNewKey(IoTString *keyName, int64_t machineId);
+       void startTransaction();
+       void addKV(IoTString *key, IoTString *value);
        TransactionStatus *commitTransaction();
 
        /**