5 * IoTTable data structure. Provides client interface.
11 #define Table_FREE_SLOTS 2
12 // Number of slots that should be kept free // 10
13 #define Table_SKIP_THRESHOLD 10
14 #define Table_RESIZE_MULTIPLE ((double)1.2)
15 #define Table_RESIZE_THRESHOLD ((double)0.75)
16 #define Table_REJECTED_THRESHOLD 5
22 CloudComm * cloud = NULL;
23 Random * random = NULL;
24 TableStatus * liveTableStatus = NULL;
25 PendingTransaction * pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
26 Transaction * lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
27 Transaction * firstPendingTransaction = NULL; // first transaction in the pending transaction list
30 int numberOfSlots = 0; // Number of slots stored in buffer
31 int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
32 int64_t liveSlotCount = 0; // Number of currently live slots
33 int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
34 int64_t localMachineId = 0; // Machine ID of this client device
35 int64_t sequenceNumber = 0; // Largest sequence number a client has received
36 int64_t localSequenceNumber = 0;
38 // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
39 // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
40 int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
41 int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
42 int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
43 int64_t localArbitrationSequenceNumber = 0;
44 bool hadPartialSendToServer = false;
45 bool attemptedToSendToServer = false;
47 bool didFindTableStatus = false;
48 int64_t currMaxSize = 0;
50 Slot * lastSlotAttemptedToSend = NULL;
51 bool lastIsNewKey = false;
53 Hashtable<Transaction *, List<int32_t> *> lastTransactionPartsSent = NULL;
54 List<Entry*> *lastPendingSendArbitrationEntriesToDelete = NULL;
55 NewKey * lastNewKey = NULL;
59 Hashtable<IoTString *, KeyValue*> *committedKeyValueTable = NULL; // Table of committed key value pairs
60 Hashtable<IoTString *, KeyValue*> * speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
61 Hashtable<IoTString *, KeyValue *> * pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
62 Hashtable<IoTString *, NewKey *> * liveNewKeyTable = NULL; // Table of live new keys
63 Hashtable<int64_t, Pair<int64_t, Liveness*>*> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
64 Hashtable<int64_t, HashSet<RejectedMessage*>*> *rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
65 Hashtable<IoTString*, int64_t> *arbitratorTable = NULL; // Table of keys and their arbitrators
66 Hashtable<Pair<int64_t, int64_t>*, Abort*> *liveAbortTable = NULL; // Table live abort messages
67 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>*> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
68 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*>*> * newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
69 Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
70 Hashtable<int64_t, Transaction*> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
71 Hashtable<Pair<int64_t, int64_t>*, Transaction*> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
72 Hashtable<int64_t, Hashtable<int64_t, Commit*>> *liveCommitsTable = NULL;
73 Hashtable<IoTString*, Commit*> *liveCommitsByKeyTable = NULL;
74 Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
75 Vector<int64_t> * rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server
76 List<Transaction*> *pendingTransactionQueue = NULL;
77 List<ArbitrationRound*> *pendingSendArbitrationRounds = NULL;
78 List<Entry*> *pendingSendArbitrationEntriesToDelete = NULL;
79 Hashtable<Transaction*, List<Integer*>*> *transactionPartsSent = NULL;
80 Hashtable<int64_t, TransactionStatus*> *outstandingTransactionStatus = NULL;
81 Hashtable<int64_t, Abort*> *liveAbortsGeneratedByLocal = NULL;
82 Hashset<Pair<int64_t, int64_t>*> *offlineTransactionsCommittedAndAtServer = NULL;
83 Hashtable<int64_t, Pair<String*, int32_t>> * localCommunicationTable = NULL;
84 Hashtable<int64_t, int64_t> * lastTransactionSeenFromMachineFromServer = NULL;
85 Hashtable<int64_t, int64_t> * lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
88 * Recalculate the new resize threshold
90 void setResizeThreshold();
91 bool sendToServer(NewKey *newKey);
92 synchronized bool updateFromLocal(int64_t machineId);
93 Pair<Boolean, Boolean> sendTransactionToLocal(Transaction *transaction);
94 ThreeTuple<Boolean, Boolean, Array<Slot*> *> * sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
96 * Returns false if a resize was needed
98 ThreeTuple<Boolean, int32_t*, Boolean> * fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
99 void doRejectedMessages(Slot s);
101 ThreeTuple<Boolean, Boolean, int64_t> doMandatoryResuce(Slot slot, bool resize);
103 void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
105 * Checks for malicious activity and updates the local copy of the block chain.
107 void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
109 void updateLiveStateFromServer();
111 void updateLiveStateFromLocal();
113 void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
115 void updateExpectedSize();
119 * Check the size of the block chain to make sure there are enough slots sent back by the server.
120 * This is only called when we have a gap between the slots that we have locally and the slots
121 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
124 void checkNumSlots(int numberOfSlots);
126 void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
130 * Update the size of of the local buffer if it is needed.
132 void commitNewMaxSize();
135 * Process the new transaction parts from this latest round of slots received from the server
137 void processNewTransactionParts();
139 int64_t lastSeqNumArbOn = 0;
141 void arbitrateFromServer();
143 Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction);
146 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
148 bool compactArbitrationData();
151 * Update all the commits and the committed tables, sets dead the dead transactions
153 bool updateCommittedTable();
156 * Create the speculative table from transactions that are still live and have come from the cloud
158 bool updateSpeculativeTable(bool didProcessNewCommits);
161 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
163 void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
166 * Set dead and remove from the live transaction tables the transactions that are dead
168 void updateLiveTransactionsAndStatus();
171 * Process this slot, entry by entry. Also update the latest message sent by slot
173 void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
176 * Update the last message that was sent for a machine Id
178 void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
181 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
183 void processEntry(NewKey entry);
186 * Process new table status entries and set dead the old ones as new ones come in.
187 * keeps track of the largest and smallest table status seen in this current round
188 * of updating the local copy of the block chain
190 void processEntry(TableStatus entry, int64_t seq);
193 * Check old messages to see if there is a block chain violation. Also
195 void processEntry(RejectedMessage entry, SlotIndexer indexer);
198 * Check if this abort is live, if not then save it so we can kill it later.
199 * update the last transaction number that was arbitrated on.
201 void processEntry(Abort entry);
204 * Set dead the transaction part if that transaction is dead and keep track of all new parts
206 void processEntry(TransactionPart entry);
209 * Process new commit entries and save them for future use. Delete duplicates
211 void processEntry(CommitPart entry);
214 * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
215 * Updates the live aborts, removes those that are dead and sets them dead.
216 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
217 * other clients have not had a rollback on the last message.
219 void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
222 * Add a rejected message entry to the watch set to keep track of which clients have seen that
223 * rejected message entry and which have not.
225 void addWatchList(int64_t machineId, RejectedMessage entry);
228 * Check if the HMAC chain is not violated
230 void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
231 bool lastInsertedNewKey = false;
234 Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
235 Table(CloudComm _cloud, int64_t _localMachineId);
238 * Initialize the table by inserting a table status as the first entry into the table status
239 * also initialize the crypto stuff.
244 * Rebuild the table from scratch by pulling the latest block chain from the server.
247 void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
248 uint64_t getArbitrator(IoTString * key);
250 IoTString * getCommitted(IoTString * key);
251 IoTString * getSpeculative(IoTString * key);
252 IoTString * getCommittedAtomic(IoTString * key);
253 bool createNewKey(IoTString * keyName, int64_t machineId);
254 TransactionStatus * commitTransaction();
257 * Get the machine ID for this client
259 int64_t getMachineId() { return localMachineId; }
262 * Decrement the number of live slots that we currently have
264 void decrementLiveCount() { liveSlotCount--; }
265 int64_t getLocalSequenceNumber();
266 Array<char> * acceptDataFromLocal(Array<char> * data);