edits
[iotcloud.git] / version2 / src / C / Table.h
1 #ifndef Table_H
2 #define Table_H
3 #include "common.h"
4 #include "Pair.h"
5 #include "ThreeTuple.h"
6 /**
7  * IoTTable data structure.  Provides client interface.
8  * @author Brian Demsky
9  * @version 1.0
10  */
11
12 /* Constants */
13 #define Table_FREE_SLOTS 2
14 // Number of slots that should be kept free // 10
15 #define Table_SKIP_THRESHOLD 10
16 #define Table_RESIZE_MULTIPLE ((double)1.2)
17 #define Table_RESIZE_THRESHOLD ((double)0.75)
18 #define Table_REJECTED_THRESHOLD 5
19
20 class Table {
21 private:
22         /* Helper Objects */
23         SlotBuffer *buffer;
24         CloudComm *cloud;
25         Random *random;
26         TableStatus *liveTableStatus;
27         PendingTransaction *pendingTransactionBuilder;  // Pending Transaction used in building a Pending Transaction
28         Transaction *lastPendingTransactionSpeculatedOn;        // Last transaction that was speculated on from the pending transaction
29         Transaction *firstPendingTransaction;   // first transaction in the pending transaction list
30
31         /* Variables */
32         int numberOfSlots;      // Number of slots stored in buffer
33         int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
34         int64_t liveSlotCount;// Number of currently live slots
35         int64_t oldestLiveSlotSequenceNumver;   // Smallest sequence number of the slot with a live entry
36         int64_t localMachineId; // Machine ID of this client device
37         int64_t sequenceNumber; // Largest sequence number a client has received
38         int64_t localSequenceNumber;
39
40         //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
41         //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
42         int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
43         int64_t lastTransactionSequenceNumberSpeculatedOn;      // the last transaction that was speculated on
44         int64_t oldestTransactionSequenceNumberSpeculatedOn;    // the oldest transaction that was speculated on
45         int64_t localArbitrationSequenceNumber;
46         bool hadPartialSendToServer;
47         bool attemptedToSendToServer;
48         int64_t expectedsize;
49         bool didFindTableStatus;
50         int64_t currMaxSize;
51
52         Slot *lastSlotAttemptedToSend;
53         bool lastIsNewKey;
54         int lastNewSize;
55         Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent;
56         Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
57         NewKey *lastNewKey;
58
59
60         /* Data Structures  */
61         Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
62         Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable;    // Table of speculated key value pairs, if there is a speculative value
63         Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable;  // Table of speculated key value pairs, if there is a speculative value from the pending transactions
64         Hashtable<IoTString *, NewKey *> *liveNewKeyTable;      // Table of live new keys
65         Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable;      // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
66         Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable;      // Table of machine Ids and the set of rejected messages they have not seen yet
67         Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
68         Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
69         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts;      // transaction parts that are seen in this latest round of slots from the server
70         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts;        // commit parts that are seen in this latest round of slots from the server
71         Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable;  // Last transaction sequence number that an arbitrator arbitrated on
72         Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable;        // live transaction grouped by the sequence number
73         Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable;        // live transaction grouped by the transaction ID
74         Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
75         Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
76         Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
77         Vector<int64_t> *rejectedSlotVector;    // Vector of rejected slots that have yet to be sent to the server
78         Vector<Transaction *> *pendingTransactionQueue;
79         Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
80         Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
81         Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent;
82         Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
83         Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
84         Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
85         Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
86         Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
87         Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
88         bool lastInsertedNewKey;
89         int64_t lastSeqNumArbOn;
90
91
92         void init();
93         /**
94          * Recalculate the new resize threshold
95          */
96         void setResizeThreshold();
97         bool sendToServer(NewKey *newKey);
98         bool updateFromLocal(int64_t machineId);
99         Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
100         ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
101         /**
102          * Returns false if a resize was needed
103          */
104         ThreeTuple<bool, int32_t *, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
105         void doRejectedMessages(Slot s);
106
107         ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot slot, bool resize);
108
109         void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
110         /**
111          * Checks for malicious activity and updates the local copy of the block chain.
112          */
113         void validateAndUpdate(Array<Slot> *newSlots, bool acceptUpdatesToLocal);
114
115         void updateLiveStateFromServer();
116
117         void updateLiveStateFromLocal();
118
119         void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
120
121         void updateExpectedSize();
122
123
124         /**
125          * Check the size of the block chain to make sure there are enough slots sent back by the server.
126          * This is only called when we have a gap between the slots that we have locally and the slots
127          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
128          * status message
129          */
130         void checkNumSlots(int numberOfSlots);
131
132         void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
133
134
135         /**
136          * Update the size of of the local buffer if it is needed.
137          */
138         void commitNewMaxSize();
139
140         /**
141          * Process the new transaction parts from this latest round of slots received from the server
142          */
143         void processNewTransactionParts();
144
145
146
147         void arbitrateFromServer();
148
149         Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction);
150
151         /**
152          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
153          */
154         bool compactArbitrationData();
155
156         /**
157          * Update all the commits and the committed tables, sets dead the dead transactions
158          */
159         bool updateCommittedTable();
160
161         /**
162          * Create the speculative table from transactions that are still live and have come from the cloud
163          */
164         bool updateSpeculativeTable(bool didProcessNewCommits);
165
166         /**
167          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
168          */
169         void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
170
171         /**
172          * Set dead and remove from the live transaction tables the transactions that are dead
173          */
174         void updateLiveTransactionsAndStatus();
175
176         /**
177          * Process this slot, entry by entry.  Also update the latest message sent by slot
178          */
179         void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
180
181         /**
182          * Update the last message that was sent for a machine Id
183          */
184         void processEntry(LastMessage entry, Hashset<int64_t> machineSet);
185
186         /**
187          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
188          */
189         void processEntry(NewKey entry);
190
191         /**
192          * Process new table status entries and set dead the old ones as new ones come in.
193          * keeps track of the largest and smallest table status seen in this current round
194          * of updating the local copy of the block chain
195          */
196         void processEntry(TableStatus entry, int64_t seq);
197
198         /**
199          * Check old messages to see if there is a block chain violation. Also
200          */
201         void processEntry(RejectedMessage entry, SlotIndexer indexer);
202
203         /**
204          * Check if this abort is live, if not then save it so we can kill it later.
205          * update the last transaction number that was arbitrated on.
206          */
207         void processEntry(Abort entry);
208
209         /**
210          * Set dead the transaction part if that transaction is dead and keep track of all new parts
211          */
212         void processEntry(TransactionPart entry);
213
214         /**
215          * Process new commit entries and save them for future use.  Delete duplicates
216          */
217         void processEntry(CommitPart entry);
218
219         /**
220          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
221          * Updates the live aborts, removes those that are dead and sets them dead.
222          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
223          * other clients have not had a rollback on the last message.
224          */
225         void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
226
227         /**
228          * Add a rejected message entry to the watch set to keep track of which clients have seen that
229          * rejected message entry and which have not.
230          */
231         void addWatchVector(int64_t machineId, RejectedMessage entry);
232
233         /**
234          * Check if the HMAC chain is not violated
235          */
236         void checkHMACChain(SlotIndexer indexer, Array<Slot> *newSlots);
237
238
239 public:
240         Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
241         Table(CloudComm _cloud, int64_t _localMachineId);
242
243         /**
244          * Initialize the table by inserting a table status as the first entry into the table status
245          * also initialize the crypto stuff.
246          */
247         void initTable();
248
249         /**
250          * Rebuild the table from scratch by pulling the latest block chain from the server.
251          */
252         void rebuild();
253         void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
254         uint64_t getArbitrator(IoTString *key);
255         void close();
256         IoTString *getCommitted(IoTString *key);
257         IoTString *getSpeculative(IoTString *key);
258         IoTString *getCommittedAtomic(IoTString *key);
259         bool createNewKey(IoTString *keyName, int64_t machineId);
260         TransactionStatus *commitTransaction();
261
262         /**
263          * Get the machine ID for this client
264          */
265         int64_t getMachineId() { return localMachineId; }
266
267         /**
268          * Decrement the number of live slots that we currently have
269          */
270         void decrementLiveCount() { liveSlotCount--; }
271         int64_t getLocalSequenceNumber();
272         Array<char> *acceptDataFromLocal(Array<char> *data);
273 };
274
275 #endif