#ifndef Table_H
#define Table_H
-
+#include "common.h"
+#include "Pair.h"
+#include "ThreeTuple.h"
/**
* IoTTable data structure. Provides client interface.
* @author Brian Demsky
private:
/* Helper Objects */
SlotBuffer *buffer;
- CloudComm *cloud = NULL;
- Random *random = NULL;
- TableStatus *liveTableStatus = NULL;
- PendingTransaction *pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction *firstPendingTransaction = NULL; // first transaction in the pending transaction list
+ CloudComm *cloud;
+ Random *random;
+ TableStatus *liveTableStatus;
+ PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
+ Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
+ Transaction *firstPendingTransaction; // first transaction in the pending transaction list
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0;// Number of currently live slots
- int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
- int64_t localSequenceNumber = 0;
+ int numberOfSlots; // Number of slots stored in buffer
+ int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount;// Number of currently live slots
+ int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry
+ int64_t localMachineId; // Machine ID of this client device
+ int64_t sequenceNumber; // Largest sequence number a client has received
+ int64_t localSequenceNumber;
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
- int64_t localArbitrationSequenceNumber = 0;
- bool hadPartialSendToServer = false;
- bool attemptedToSendToServer = false;
+ int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on
+ int64_t localArbitrationSequenceNumber;
+ bool hadPartialSendToServer;
+ bool attemptedToSendToServer;
int64_t expectedsize;
- bool didFindTableStatus = false;
- int64_t currMaxSize = 0;
+ bool didFindTableStatus;
+ int64_t currMaxSize;
- Slot *lastSlotAttemptedToSend = NULL;
- bool lastIsNewKey = false;
- int lastNewSize = 0;
- Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
- Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete = NULL;
- NewKey *lastNewKey = NULL;
+ Slot *lastSlotAttemptedToSend;
+ bool lastIsNewKey;
+ int lastNewSize;
+ Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent;
+ Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
+ NewKey *lastNewKey;
/* Data Structures */
- Hashtable<IoTString *, KeyValue *> *committedKeyValueTable = NULL;// Table of committed key value pairs
- Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString *, NewKey *> *liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<int64_t, HashSet<RejectedMessage *> *> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString *, int64_t> *arbitratorTable = NULL;// Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable = NULL;// Table live abort messages
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable = NULL;
- Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable = NULL;
- Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<int64_t> *rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
- Vector<Transaction *> *pendingTransactionQueue = NULL;
- Vector<ArbitrationRound *> *pendingSendArbitrationRounds = NULL;
- Vector<Entry *> *pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent = NULL;
- Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus = NULL;
- Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal = NULL;
- Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<int64_t, Pair<String *, int32_t> > *localCommunicationTable = NULL;
- Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer = NULL;
- Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+ Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
+ Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString *, NewKey *> *liveNewKeyTable; // Table of live new keys
+ Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
+ Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
+ Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+ Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
+ Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Transaction *> *pendingTransactionQueue;
+ Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
+ Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
+ Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent;
+ Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
+ Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
+ Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
+ Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+ Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
+ Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+ bool lastInsertedNewKey;
+ int64_t lastSeqNumArbOn;
+
+
void init();
/**
* Recalculate the new resize threshold
*/
void setResizeThreshold();
bool sendToServer(NewKey *newKey);
- synchronized bool updateFromLocal(int64_t machineId);
+ bool updateFromLocal(int64_t machineId);
Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
/**
/**
* Checks for malicious activity and updates the local copy of the block chain.
*/
- void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
+ void validateAndUpdate(Array<Slot> *newSlots, bool acceptUpdatesToLocal);
void updateLiveStateFromServer();
*/
void processNewTransactionParts();
- int64_t lastSeqNumArbOn = 0;
+
void arbitrateFromServer();
/**
* Process this slot, entry by entry. Also update the latest message sent by slot
*/
- void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+ void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
/**
* Update the last message that was sent for a machine Id
*/
- void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
+ void processEntry(LastMessage entry, Hashset<int64_t> machineSet);
/**
* Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
* Check that the last message seen is correct and that there is no mismatch of our own last message or that
* other clients have not had a rollback on the last message.
*/
- void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+ void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
/**
* Add a rejected message entry to the watch set to keep track of which clients have seen that
/**
* Check if the HMAC chain is not violated
*/
- void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
- bool lastInsertedNewKey = false;
+ void checkHMACChain(SlotIndexer indexer, Array<Slot> *newSlots);
+
public:
- Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
+ Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
Table(CloudComm _cloud, int64_t _localMachineId);
/**
* Rebuild the table from scratch by pulling the latest block chain from the server.
*/
void rebuild();
- void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
+ void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
uint64_t getArbitrator(IoTString *key);
void close();
IoTString *getCommitted(IoTString *key);