edits
[iotcloud.git] / version2 / src / C / Table.h
1 #ifndef Table_H
2 #define Table_H
3
4 /**
5  * IoTTable data structure.  Provides client interface.
6  * @author Brian Demsky
7  * @version 1.0
8  */
9
10         /* Constants */
11 #define Table_FREE_SLOTS 2
12 // Number of slots that should be kept free // 10
13 #define Table_SKIP_THRESHOLD 10
14 #define Table_RESIZE_MULTIPLE ((double)1.2)
15 #define Table_RESIZE_THRESHOLD ((double)0.75)
16 #define Table_REJECTED_THRESHOLD 5
17
18 class Table {
19  private:
20         /* Helper Objects */
21         SlotBuffer * buffer;
22         CloudComm * cloud = NULL;
23         Random * random = NULL;
24         TableStatus * liveTableStatus = NULL;
25         PendingTransaction * pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
26         Transaction * lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
27         Transaction * firstPendingTransaction = NULL; // first transaction in the pending transaction list
28         
29         /* Variables */
30         int numberOfSlots = 0;  // Number of slots stored in buffer
31         int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
32         int64_t liveSlotCount = 0; // Number of currently live slots
33         int64_t oldestLiveSlotSequenceNumver = 0;       // Smallest sequence number of the slot with a live entry
34         int64_t localMachineId = 0; // Machine ID of this client device
35         int64_t sequenceNumber = 0; // Largest sequence number a client has received
36         int64_t localSequenceNumber = 0;
37          
38         //  int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
39         //  int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
40         int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
41         int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
42         int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
43         int64_t localArbitrationSequenceNumber = 0;
44         bool hadPartialSendToServer = false;
45         bool attemptedToSendToServer = false;
46         int64_t expectedsize;
47         bool didFindTableStatus = false;
48         int64_t currMaxSize = 0;
49         
50         Slot * lastSlotAttemptedToSend = NULL;
51         bool lastIsNewKey = false;
52         int lastNewSize = 0;
53         Hashtable<Transaction *, List<int32_t> *> lastTransactionPartsSent = NULL;
54         List<Entry*> *lastPendingSendArbitrationEntriesToDelete = NULL;
55         NewKey * lastNewKey = NULL;
56          
57          
58         /* Data Structures  */
59         Hashtable<IoTString *, KeyValue*> *committedKeyValueTable = NULL; // Table of committed key value pairs
60         Hashtable<IoTString *, KeyValue*> * speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
61         Hashtable<IoTString *, KeyValue *> * pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
62         Hashtable<IoTString *, NewKey *> * liveNewKeyTable = NULL; // Table of live new keys
63         Hashtable<int64_t, Pair<int64_t, Liveness*>*> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
64         Hashtable<int64_t, HashSet<RejectedMessage*>*> *rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
65         Hashtable<IoTString*, int64_t> *arbitratorTable = NULL; // Table of keys and their arbitrators
66         Hashtable<Pair<int64_t, int64_t>*, Abort*> *liveAbortTable = NULL; // Table live abort messages
67         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>*> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
68         Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*>*> * newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
69         Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
70         Hashtable<int64_t, Transaction*> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
71         Hashtable<Pair<int64_t, int64_t>*, Transaction*> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
72         Hashtable<int64_t, Hashtable<int64_t, Commit*>> *liveCommitsTable = NULL;
73         Hashtable<IoTString*, Commit*> *liveCommitsByKeyTable = NULL;
74         Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
75         Vector<int64_t> * rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server
76         List<Transaction*> *pendingTransactionQueue = NULL;
77         List<ArbitrationRound*> *pendingSendArbitrationRounds = NULL;
78         List<Entry*> *pendingSendArbitrationEntriesToDelete = NULL;
79         Hashtable<Transaction*, List<Integer*>*> *transactionPartsSent = NULL;
80         Hashtable<int64_t, TransactionStatus*> *outstandingTransactionStatus = NULL;
81         Hashtable<int64_t, Abort*> *liveAbortsGeneratedByLocal = NULL;
82         Hashset<Pair<int64_t, int64_t>*> *offlineTransactionsCommittedAndAtServer = NULL;
83         Hashtable<int64_t, Pair<String*, int32_t>> * localCommunicationTable = NULL;
84         Hashtable<int64_t, int64_t> * lastTransactionSeenFromMachineFromServer = NULL;
85         Hashtable<int64_t, int64_t> * lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
86         void init();
87          /**
88                 * Recalculate the new resize threshold
89                 */
90         void setResizeThreshold();
91         bool sendToServer(NewKey *newKey);
92         synchronized bool updateFromLocal(int64_t machineId);
93         Pair<Boolean, Boolean> sendTransactionToLocal(Transaction *transaction);
94         ThreeTuple<Boolean, Boolean, Array<Slot*> *> * sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
95         /**
96          * Returns false if a resize was needed
97          */
98         ThreeTuple<Boolean, int32_t*, Boolean> * fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
99         void doRejectedMessages(Slot s);
100         
101         ThreeTuple<Boolean, Boolean, int64_t> doMandatoryResuce(Slot slot, bool resize);
102          
103          void  doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
104          /**
105          * Checks for malicious activity and updates the local copy of the block chain.
106          */
107          void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
108          
109          void updateLiveStateFromServer();
110          
111          void updateLiveStateFromLocal();
112          
113          void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
114          
115          void updateExpectedSize();
116
117          
118          /**
119                 * Check the size of the block chain to make sure there are enough slots sent back by the server.
120                 * This is only called when we have a gap between the slots that we have locally and the slots
121                 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
122                 * status message
123                 */
124          void checkNumSlots(int numberOfSlots);
125          
126          void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
127          
128
129          /**
130          * Update the size of of the local buffer if it is needed.
131          */
132          void commitNewMaxSize();
133          
134          /**
135                 * Process the new transaction parts from this latest round of slots received from the server
136                 */
137          void processNewTransactionParts();
138          
139          int64_t lastSeqNumArbOn = 0;
140
141          void arbitrateFromServer();
142          
143          Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction);
144          
145          /**
146                 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
147                 */
148          bool compactArbitrationData();
149
150          /**
151                 * Update all the commits and the committed tables, sets dead the dead transactions
152                 */
153          bool updateCommittedTable();
154          
155          /**
156                 * Create the speculative table from transactions that are still live and have come from the cloud
157                 */
158          bool updateSpeculativeTable(bool didProcessNewCommits);
159
160          /**
161                 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
162                 */
163          void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
164          
165          /**
166                 * Set dead and remove from the live transaction tables the transactions that are dead
167                 */
168          void updateLiveTransactionsAndStatus();
169          
170          /**
171                 * Process this slot, entry by entry.  Also update the latest message sent by slot
172                 */
173          void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
174          
175          /**
176                 * Update the last message that was sent for a machine Id
177                 */
178          void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
179          
180          /**
181                 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
182                 */
183          void processEntry(NewKey entry);
184          
185          /**
186                 * Process new table status entries and set dead the old ones as new ones come in.
187                 * keeps track of the largest and smallest table status seen in this current round
188                 * of updating the local copy of the block chain
189                 */
190          void processEntry(TableStatus entry, int64_t seq);
191          
192          /**
193                 * Check old messages to see if there is a block chain violation. Also
194                 */
195          void processEntry(RejectedMessage entry, SlotIndexer indexer);
196          
197          /**
198                 * Check if this abort is live, if not then save it so we can kill it later.
199                 * update the last transaction number that was arbitrated on.
200                 */
201          void processEntry(Abort entry);
202          
203          /**
204                 * Set dead the transaction part if that transaction is dead and keep track of all new parts
205                 */
206          void processEntry(TransactionPart entry);
207          
208          /**
209                 * Process new commit entries and save them for future use.  Delete duplicates
210                 */
211          void processEntry(CommitPart entry);
212          
213          /**
214                 * Update the last message seen table.  Update and set dead the appropriate RejectedMessages as clients see them.
215                 * Updates the live aborts, removes those that are dead and sets them dead.
216                 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
217                 * other clients have not had a rollback on the last message.
218                 */
219          void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
220          
221          /**
222                 * Add a rejected message entry to the watch set to keep track of which clients have seen that
223                 * rejected message entry and which have not.
224                 */
225          void addWatchList(int64_t machineId, RejectedMessage entry);
226
227          /**
228                 * Check if the HMAC chain is not violated
229                 */
230          void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
231          bool lastInsertedNewKey = false;
232          
233  public:
234          Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
235          Table(CloudComm _cloud, int64_t _localMachineId);
236          
237          /**
238                 * Initialize the table by inserting a table status as the first entry into the table status
239                 * also initialize the crypto stuff.
240                 */
241          void initTable();
242          
243          /**
244                 * Rebuild the table from scratch by pulling the latest block chain from the server.
245                 */
246          void rebuild();
247          void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
248          uint64_t getArbitrator(IoTString * key);
249          void close();
250          IoTString * getCommitted(IoTString * key);
251          IoTString * getSpeculative(IoTString * key);
252          IoTString * getCommittedAtomic(IoTString * key);
253          bool createNewKey(IoTString * keyName, int64_t machineId);
254          TransactionStatus * commitTransaction();
255          
256         /**
257          * Get the machine ID for this client
258          */
259          int64_t getMachineId() { return localMachineId; }
260          
261          /**
262                 * Decrement the number of live slots that we currently have
263                 */
264          void decrementLiveCount() { liveSlotCount--; }
265          int64_t getLocalSequenceNumber();
266          Array<char> * acceptDataFromLocal(Array<char> * data);
267 };
268
269 #endif