Adding test for encrypted cloud storage; Adding preparation timing test for Fidelius...
[iotcloud.git] / version2 / src / C / Table.h
1 #ifndef Table_H
2 #define Table_H
3 #include "common.h"
4 #include "Pair.h"
5 #include "ThreeTuple.h"
6 #include "IoTString.h"
7
8 /**
9  * IoTTable data structure.  Provides client interface.
10  * @author Brian Demsky
11  * @version 1.0
12  */
13
14 /* Constants */
15 #define Table_FREE_SLOTS 2
16 // Number of slots that should be kept free // 10
17 #define Table_SKIP_THRESHOLD 10
18 #define Table_RESIZE_MULTIPLE ((double)1.2)
19 #define Table_RESIZE_THRESHOLD ((double)0.75)
20 #define Table_REJECTED_THRESHOLD 5
21
22 class Table {
23 private:
24         /* Helper Objects */
25         SlotBuffer *buffer;
26         CloudComm *cloud;
27         SecureRandom *random;
28         TableStatus *liveTableStatus;
29         PendingTransaction *pendingTransactionBuilder;  // Pending Transaction used in building a Pending Transaction
30         Transaction *lastPendingTransactionSpeculatedOn;        // Last transaction that was speculated on from the pending transaction
31         Transaction *firstPendingTransaction;   // first transaction in the pending transaction list
32
33         /* Variables */
34         int numberOfSlots;      // Number of slots stored in buffer
35         int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
36         int64_t liveSlotCount;// Number of currently live slots
37         int64_t oldestLiveSlotSequenceNumver;   // Smallest sequence number of the slot with a live entry
38         int64_t localMachineId; // Machine ID of this client device
39         int64_t sequenceNumber; // Largest sequence number a client has received
40         int64_t localSequenceNumber;
41
42         //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
43         //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
44         int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
45         int64_t lastTransactionSequenceNumberSpeculatedOn;      // the last transaction that was speculated on
46         int64_t oldestTransactionSequenceNumberSpeculatedOn;    // the oldest transaction that was speculated on
47         int64_t localArbitrationSequenceNumber;
48         bool hadPartialSendToServer;
49         bool attemptedToSendToServer;
50         int64_t expectedsize;
51         bool didFindTableStatus;
52         int64_t currMaxSize;
53
54         Slot *lastSlotAttemptedToSend;
55         bool lastIsNewKey;
56         int lastNewSize;
57         Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
58         Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
59         NewKey *lastNewKey;
60         void processTransactionList(bool handlePartial);
61         void clearSentParts();
62
63         /* Data Structures  */
64         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable;// Table of committed key value pairs
65         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable;    // Table of speculated key value pairs, if there is a speculative value
66         Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable;  // Table of speculated key value pairs, if there is a speculative value from the pending transactions
67         Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals> *liveNewKeyTable;      // Table of live new keys
68         Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable;      // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
69         Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable;      // Table of machine Ids and the set of rejected messages they have not seen yet
70         Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals> *arbitratorTable;// Table of keys and their arbitrators
71         Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
72         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts;  // transaction parts that are seen in this latest round of slots from the server
73         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts;    // commit parts that are seen in this latest round of slots from the server
74         Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable;  // Last transaction sequence number that an arbitrator arbitrated on
75         Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable;        // live transaction grouped by the sequence number
76         Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable;    // live transaction grouped by the transaction ID
77         Hashtable<int64_t, Hashtable<int64_t, Commit *> *> *liveCommitsTable;
78         Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals> *liveCommitsByKeyTable;
79         Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
80         Vector<int64_t> *rejectedSlotVector;    // Vector of rejected slots that have yet to be sent to the server
81         Vector<Transaction *> *pendingTransactionQueue;
82         Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
83         Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
84         Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
85         Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
86         Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
87         Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
88         Hashtable<int64_t, Pair<IoTString *, int32_t> *> *localCommunicationTable;
89         Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
90         Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
91         bool lastInsertedNewKey;
92         int64_t lastSeqNumArbOn;
93
94
95         void init();
96         /**
97          * Recalculate the new resize threshold
98          */
99         void setResizeThreshold();
100         bool sendToServer(NewKey *newKey);
101         NewKey * handlePartialSend(NewKey * newKey);
102         bool checkSend(Array<Slot *> * array, Slot *checkSlot);
103
104         bool updateFromLocal(int64_t machineId);
105         Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
106         bool sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool * wasInserted, Array<Slot *> **array);
107         /**
108          * Returns false if a resize was needed
109          */
110         bool fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey);
111         void doRejectedMessages(Slot *s);
112
113         ThreeTuple<bool, bool, int64_t> doMandatoryRescue(Slot *slot, bool resize);
114
115         void  doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
116         /**
117          * Checks for malicious activity and updates the local copy of the block chain.
118          */
119         void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
120
121         void updateLiveStateFromServer();
122
123         void updateLiveStateFromLocal();
124
125         void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
126
127         void updateExpectedSize();
128
129
130         /**
131          * Check the size of the block chain to make sure there are enough slots sent back by the server.
132          * This is only called when we have a gap between the slots that we have locally and the slots
133          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
134          * status message
135          */
136         void checkNumSlots(int numberOfSlots);
137
138         void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
139
140
141         /**
142          * Update the size of of the local buffer if it is needed.
143          */
144         void commitNewMaxSize();
145
146         /**
147          * Process the new transaction parts from this latest round of slots received from the server
148          */
149         void processNewTransactionParts();
150
151
152
153         void arbitrateFromServer();
154
155         Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
156
157         /**
158          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
159          */
160         bool compactArbitrationData();
161
162         /**
163          * Update all the commits and the committed tables, sets dead the dead transactions
164          */
165         bool updateCommittedTable();
166
167         /**
168          * Create the speculative table from transactions that are still live and have come from the cloud
169          */
170         bool updateSpeculativeTable(bool didProcessNewCommits);
171
172         /**
173          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
174          */
175         void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
176
177         /**
178          * Set dead and remove from the live transaction tables the transactions that are dead
179          */
180         void updateLiveTransactionsAndStatus();
181
182         /**
183          * Process this slot, entry by entry.  Also update the latest message sent by slot
184          */
185         void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
186
187         /**
188          * Update the last message that was sent for a machine Id
189          */
190         void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
191
192         /**
193          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
194          */
195         void processEntry(NewKey *entry);
196
197         /**
198          * Process new table status entries and set dead the old ones as new ones come in.
199          * keeps track of the largest and smallest table status seen in this current round
200          * of updating the local copy of the block chain
201          */
202         void processEntry(TableStatus *entry, int64_t seq);
203
204         /**
205          * Check old messages to see if there is a block chain violation. Also
206          */
207         void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
208
209         /**
210          * Check if this abort is live, if not then save it so we can kill it later.
211          * update the last transaction number that was arbitrated on.
212          */
213         void processEntry(Abort *entry);
214
215         /**
216          * Set dead the transaction part if that transaction is dead and keep track of all new parts
217          */
218         void processEntry(TransactionPart *entry);
219
220         /**
221          * Process new commit entries and save them for future use.  Delete duplicates
222          */
223         void processEntry(CommitPart *entry);
224
225         /**
226          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
227          * Updates the live aborts, removes those that are dead and sets them dead.
228          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
229          * other clients have not had a rollback on the last message.
230          */
231         void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
232
233         /**
234          * Add a rejected message entry to the watch set to keep track of which clients have seen that
235          * rejected message entry and which have not.
236          */
237         void addWatchVector(int64_t machineId, RejectedMessage *entry);
238
239         /**
240          * Check if the HMAC chain is not violated
241          */
242         void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
243
244
245 public:
246         Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
247         Table(CloudComm *_cloud, int64_t _localMachineId);
248         ~Table();
249
250         /**
251          * Initialize the table by inserting a table status as the first entry into the table status
252          * also initialize the crypto stuff.
253          */
254         void initTable();
255
256         /**
257          * Rebuild the table from scratch by pulling the latest block chain from the server.
258          */
259
260         void rebuild();
261         void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
262         int64_t getArbitrator(IoTString *key);
263         void close();
264         IoTString *getCommitted(IoTString *key);
265         IoTString *getSpeculative(IoTString *key);
266         IoTString *getCommittedAtomic(IoTString *key);
267         IoTString *getSpeculativeAtomic(IoTString *key);
268         bool update();
269         bool createNewKey(IoTString *keyName, int64_t machineId);
270         void startTransaction();
271         void put(IoTString *key, IoTString *value);
272         TransactionStatus *commitTransaction();
273
274         /**
275          * Get the machine ID for this client
276          */
277         int64_t getMachineId() { return localMachineId; }
278
279         /**
280          * Decrement the number of live slots that we currently have
281          */
282         void decrementLiveCount() { liveSlotCount--; }
283         int64_t getLocalSequenceNumber();
284         Array<char> *acceptDataFromLocal(Array<char> *data);
285 };
286
287 #endif