edits
[iotcloud.git] / version2 / src / C / Table.h
1 #ifndef Table_H
2 #define Table_H
3
4 /**
5  * IoTTable data structure.  Provides client interface.
6  * @author Brian Demsky
7  * @version 1.0
8  */
9
10 /* Constants */
11 #define Table_FREE_SLOTS 2
12 // Number of slots that should be kept free // 10
13 #define Table_SKIP_THRESHOLD 10
14 #define Table_RESIZE_MULTIPLE ((double)1.2)
15 #define Table_RESIZE_THRESHOLD ((double)0.75)
16 #define Table_REJECTED_THRESHOLD 5
17
18 class Table {
19 private:
20         /* Helper Objects */
21         SlotBuffer *buffer;
22         CloudComm *cloud = NULL;
23         Random *random = NULL;
24         TableStatus *liveTableStatus = NULL;
25         PendingTransaction *pendingTransactionBuilder = NULL;   // Pending Transaction used in building a Pending Transaction
26         Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
27         Transaction *firstPendingTransaction = NULL;    // first transaction in the pending transaction list
28
29         /* Variables */
30         int numberOfSlots = 0;  // Number of slots stored in buffer
31         int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
32         int64_t liveSlotCount = 0;// Number of currently live slots
33         int64_t oldestLiveSlotSequenceNumver = 0;       // Smallest sequence number of the slot with a live entry
34         int64_t localMachineId = 0;     // Machine ID of this client device
35         int64_t sequenceNumber = 0;     // Largest sequence number a client has received
36         int64_t localSequenceNumber = 0;
37
38         //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
39         //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
40         int64_t localTransactionSequenceNumber = 0;     // Local sequence number counter for transactions
41         int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
42         int64_t oldestTransactionSequenceNumberSpeculatedOn = -1;       // the oldest transaction that was speculated on
43         int64_t localArbitrationSequenceNumber = 0;
44         bool hadPartialSendToServer = false;
45         bool attemptedToSendToServer = false;
46         int64_t expectedsize;
47         bool didFindTableStatus = false;
48         int64_t currMaxSize = 0;
49
50         Slot *lastSlotAttemptedToSend = NULL;
51         bool lastIsNewKey = false;
52         int lastNewSize = 0;
53         Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
54         Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete = NULL;
55         NewKey *lastNewKey = NULL;
56
57
58         /* Data Structures  */
59         Hashtable<IoTString *, KeyValue *> *committedKeyValueTable = NULL;// Table of committed key value pairs
60         Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable = NULL;     // Table of speculated key value pairs, if there is a speculative value
61         Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable = NULL;   // Table of speculated key value pairs, if there is a speculative value from the pending transactions
62         Hashtable<IoTString *, NewKey *> *liveNewKeyTable = NULL;       // Table of live new keys
63         Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable = NULL;       // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
64         Hashtable<int64_t, HashSet<RejectedMessage *> *> *rejectedMessageWatchVectorTable = NULL;       // Table of machine Ids and the set of rejected messages they have not seen yet
65         Hashtable<IoTString *, int64_t> *arbitratorTable = NULL;// Table of keys and their arbitrators
66         Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable = NULL;// Table live abort messages
67         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts = NULL;       // transaction parts that are seen in this latest round of slots from the server
68         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
69         Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL;   // Last transaction sequence number that an arbitrator arbitrated on
70         Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
71         Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
72         Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable = NULL;
73         Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable = NULL;
74         Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
75         Vector<int64_t> *rejectedSlotVector = NULL;     // Vector of rejected slots that have yet to be sent to the server
76         Vector<Transaction *> *pendingTransactionQueue = NULL;
77         Vector<ArbitrationRound *> *pendingSendArbitrationRounds = NULL;
78         Vector<Entry *> *pendingSendArbitrationEntriesToDelete = NULL;
79         Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent = NULL;
80         Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus = NULL;
81         Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal = NULL;
82         Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer = NULL;
83         Hashtable<int64_t, Pair<String *, int32_t> > *localCommunicationTable = NULL;
84         Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer = NULL;
85         Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
86         void init();
87         /**
88          * Recalculate the new resize threshold
89          */
90         void setResizeThreshold();
91         bool sendToServer(NewKey *newKey);
92         synchronized bool updateFromLocal(int64_t machineId);
93         Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
94         ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
95         /**
96          * Returns false if a resize was needed
97          */
98         ThreeTuple<bool, int32_t *, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
99         void doRejectedMessages(Slot s);
100
101         ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot slot, bool resize);
102
103         void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
104         /**
105          * Checks for malicious activity and updates the local copy of the block chain.
106          */
107         void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
108
109         void updateLiveStateFromServer();
110
111         void updateLiveStateFromLocal();
112
113         void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
114
115         void updateExpectedSize();
116
117
118         /**
119          * Check the size of the block chain to make sure there are enough slots sent back by the server.
120          * This is only called when we have a gap between the slots that we have locally and the slots
121          * sent by the server therefore in the slots sent by the server there will be at least 1 Table
122          * status message
123          */
124         void checkNumSlots(int numberOfSlots);
125
126         void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
127
128
129         /**
130          * Update the size of of the local buffer if it is needed.
131          */
132         void commitNewMaxSize();
133
134         /**
135          * Process the new transaction parts from this latest round of slots received from the server
136          */
137         void processNewTransactionParts();
138
139         int64_t lastSeqNumArbOn = 0;
140
141         void arbitrateFromServer();
142
143         Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction);
144
145         /**
146          * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
147          */
148         bool compactArbitrationData();
149
150         /**
151          * Update all the commits and the committed tables, sets dead the dead transactions
152          */
153         bool updateCommittedTable();
154
155         /**
156          * Create the speculative table from transactions that are still live and have come from the cloud
157          */
158         bool updateSpeculativeTable(bool didProcessNewCommits);
159
160         /**
161          * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
162          */
163         void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
164
165         /**
166          * Set dead and remove from the live transaction tables the transactions that are dead
167          */
168         void updateLiveTransactionsAndStatus();
169
170         /**
171          * Process this slot, entry by entry.  Also update the latest message sent by slot
172          */
173         void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
174
175         /**
176          * Update the last message that was sent for a machine Id
177          */
178         void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
179
180         /**
181          * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
182          */
183         void processEntry(NewKey entry);
184
185         /**
186          * Process new table status entries and set dead the old ones as new ones come in.
187          * keeps track of the largest and smallest table status seen in this current round
188          * of updating the local copy of the block chain
189          */
190         void processEntry(TableStatus entry, int64_t seq);
191
192         /**
193          * Check old messages to see if there is a block chain violation. Also
194          */
195         void processEntry(RejectedMessage entry, SlotIndexer indexer);
196
197         /**
198          * Check if this abort is live, if not then save it so we can kill it later.
199          * update the last transaction number that was arbitrated on.
200          */
201         void processEntry(Abort entry);
202
203         /**
204          * Set dead the transaction part if that transaction is dead and keep track of all new parts
205          */
206         void processEntry(TransactionPart entry);
207
208         /**
209          * Process new commit entries and save them for future use.  Delete duplicates
210          */
211         void processEntry(CommitPart entry);
212
213         /**
214          * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
215          * Updates the live aborts, removes those that are dead and sets them dead.
216          * Check that the last message seen is correct and that there is no mismatch of our own last message or that
217          * other clients have not had a rollback on the last message.
218          */
219         void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
220
221         /**
222          * Add a rejected message entry to the watch set to keep track of which clients have seen that
223          * rejected message entry and which have not.
224          */
225         void addWatchVector(int64_t machineId, RejectedMessage entry);
226
227         /**
228          * Check if the HMAC chain is not violated
229          */
230         void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
231         bool lastInsertedNewKey = false;
232
233 public:
234         Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
235         Table(CloudComm _cloud, int64_t _localMachineId);
236
237         /**
238          * Initialize the table by inserting a table status as the first entry into the table status
239          * also initialize the crypto stuff.
240          */
241         void initTable();
242
243         /**
244          * Rebuild the table from scratch by pulling the latest block chain from the server.
245          */
246         void rebuild();
247         void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
248         uint64_t getArbitrator(IoTString *key);
249         void close();
250         IoTString *getCommitted(IoTString *key);
251         IoTString *getSpeculative(IoTString *key);
252         IoTString *getCommittedAtomic(IoTString *key);
253         bool createNewKey(IoTString *keyName, int64_t machineId);
254         TransactionStatus *commitTransaction();
255
256         /**
257          * Get the machine ID for this client
258          */
259         int64_t getMachineId() { return localMachineId; }
260
261         /**
262          * Decrement the number of live slots that we currently have
263          */
264         void decrementLiveCount() { liveSlotCount--; }
265         int64_t getLocalSequenceNumber();
266         Array<char> *acceptDataFromLocal(Array<char> *data);
267 };
268
269 #endif