#include "common.h"
#include "Pair.h"
#include "ThreeTuple.h"
+#include "IoTString.h"
+
/**
* IoTTable data structure. Provides client interface.
* @author Brian Demsky
/* Helper Objects */
SlotBuffer *buffer;
CloudComm *cloud;
- Random *random;
+ SecureRandom *random;
TableStatus *liveTableStatus;
PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
/* Data Structures */
- Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
- Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString *, NewKey *> *liveNewKeyTable; // Table of live new keys
+ Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable;// Table of committed key value pairs
+ Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals> *liveNewKeyTable; // Table of live new keys
Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals> *arbitratorTable;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
- Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
- Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+ Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
+ Hashtable<int64_t, Hashtable<int64_t, Commit *> *> *liveCommitsTable;
+ Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals> *liveCommitsByKeyTable;
Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
Vector<Transaction *> *pendingTransactionQueue;
Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
- Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
- Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+ Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
+ Hashtable<int64_t, Pair<IoTString *, int32_t> *> *localCommunicationTable;
Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
bool lastInsertedNewKey;
*/
void setResizeThreshold();
bool sendToServer(NewKey *newKey);
+ NewKey * handlePartialSend(NewKey * newKey);
bool updateFromLocal(int64_t machineId);
Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
- ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
+ bool sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool * wasInserted, Array<Slot *> **array);
/**
* Returns false if a resize was needed
*/
- ThreeTuple<bool, int32_t, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+ ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
void doRejectedMessages(Slot *s);
ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
/**
* Checks for malicious activity and updates the local copy of the block chain.
*/
- void validateAndUpdate(Array<Slot*> *newSlots, bool acceptUpdatesToLocal);
+ void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
void updateLiveStateFromServer();
void arbitrateFromServer();
- Pair<bool, bool> arbitrateOnLocalTransaction(Transaction * transaction);
+ Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
/**
* Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
/**
* Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
*/
- void processEntry(NewKey * entry);
+ void processEntry(NewKey *entry);
/**
* Process new table status entries and set dead the old ones as new ones come in.
* Add a rejected message entry to the watch set to keep track of which clients have seen that
* rejected message entry and which have not.
*/
- void addWatchVector(int64_t machineId, RejectedMessage * entry);
+ void addWatchVector(int64_t machineId, RejectedMessage *entry);
/**
* Check if the HMAC chain is not violated
*/
- void checkHMACChain(SlotIndexer * indexer, Array<Slot*> *newSlots);
+ void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
public:
Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
Table(CloudComm *_cloud, int64_t _localMachineId);
+ ~Table();
/**
* Initialize the table by inserting a table status as the first entry into the table status
/**
* Rebuild the table from scratch by pulling the latest block chain from the server.
*/
- bool update();
+
void rebuild();
void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
int64_t getArbitrator(IoTString *key);
IoTString *getCommitted(IoTString *key);
IoTString *getSpeculative(IoTString *key);
IoTString *getCommittedAtomic(IoTString *key);
+ IoTString *getSpeculativeAtomic(IoTString *key);
+ bool update();
bool createNewKey(IoTString *keyName, int64_t machineId);
void startTransaction();
void addKV(IoTString *key, IoTString *value);