04fbb9a819c60be7a3790665206c40b38ae7bcde
[iotcloud.git] / version2 / src / C / Transaction.h
1 #ifndef TRANSACTION_H
2 #define TRANSACTION_H
3 #include "common.h"
4
5 class Transaction {
6 private:
7         Hashtable<int32_t, TransactionPart *> parts = NULL;
8         Set<int32_t> missingParts = NULL;
9         Vector<int32_t> partsPendingSend = NULL;
10         bool isComplete = false;
11         bool hasLastPart = false;
12         Hashset<KeyValue *> *keyValueGuardSet = NULL;
13         Hashset<KeyValue *> *keyValueUpdateSet = NULL;
14         bool isDead = false;
15         int64_t sequenceNumber = -1;
16         int64_t clientLocalSequenceNumber = -1;
17         int64_t arbitratorId = -1;
18         int64_t machineId = -1;
19         Pair<uint64_t, uint64_t> transactionId = NULL;
20
21         int nextPartToSend = 0;
22         bool didSendAPartToServer = false;
23
24         TransactionStatus transactionStatus = NULL;
25
26         bool hadServerFailure = false;
27
28         public Transaction() {
29                 parts = new Hashtable<int32_t, TransactionPart>();
30                 keyValueGuardSet = new HashSet<KeyValue>();
31                 keyValueUpdateSet = new HashSet<KeyValue>();
32                 partsPendingSend = new Vector<int32_t>();
33         }
34
35         public void addPartEncode(TransactionPart newPart) {
36                 parts.put(newPart.getPartNumber(), newPart);
37                 partsPendingSend.add(newPart.getPartNumber());
38
39                 sequenceNumber = newPart.getSequenceNumber();
40                 arbitratorId = newPart.getArbitratorId();
41                 transactionId = newPart.getTransactionId();
42                 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
43                 machineId = newPart.getMachineId();
44
45                 isComplete = true;
46         }
47
48         public void addPartDecode(TransactionPart newPart) {
49
50                 if (isDead) {
51                         // If dead then just kill this part and move on
52                         newPart.setDead();
53                         return;
54                 }
55
56                 sequenceNumber = newPart.getSequenceNumber();
57                 arbitratorId = newPart.getArbitratorId();
58                 transactionId = newPart.getTransactionId();
59                 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
60                 machineId = newPart.getMachineId();
61
62                 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
63
64                 if (previoslySeenPart != NULL) {
65                         // Set dead the old one since the new one is a rescued version of this part
66                         previoslySeenPart.setDead();
67                 } else if (newPart.isLastPart()) {
68                         missingParts = new HashSet<int32_t>();
69                         hasLastPart = true;
70
71                         for (int i = 0; i < newPart.getPartNumber(); i++) {
72                                 if (parts.get(i) == NULL) {
73                                         missingParts.add(i);
74                                 }
75                         }
76                 }
77
78                 if (!isComplete && hasLastPart) {
79
80                         // We have seen this part so remove it from the set of missing parts
81                         missingParts.remove(newPart.getPartNumber());
82
83                         // Check if all the parts have been seen
84                         if (missingParts.size() == 0) {
85
86                                 // We have all the parts
87                                 isComplete = true;
88
89                                 // Decode all the parts and create the key value guard and update sets
90                                 decodeTransactionData();
91                         }
92                 }
93         }
94
95         public void addUpdateKV(KeyValue kv) {
96                 keyValueUpdateSet.add(kv);
97         }
98
99         public void addGuardKV(KeyValue kv) {
100                 keyValueGuardSet.add(kv);
101         }
102
103
104         public int64_t getSequenceNumber() {
105                 return sequenceNumber;
106         }
107
108         public void setSequenceNumber(int64_t _sequenceNumber) {
109                 sequenceNumber = _sequenceNumber;
110
111                 for (int32_t i : parts.keySet()) {
112                         parts.get(i).setSequenceNumber(sequenceNumber);
113                 }
114         }
115
116         public int64_t getClientLocalSequenceNumber() {
117                 return clientLocalSequenceNumber;
118         }
119
120         public Hashtable<int32_t, TransactionPart> getParts() {
121                 return parts;
122         }
123
124         public bool didSendAPartToServer() {
125                 return didSendAPartToServer;
126         }
127
128         public void resetNextPartToSend() {
129                 nextPartToSend = 0;
130         }
131
132         public TransactionPart getNextPartToSend() {
133                 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
134                         return NULL;
135                 }
136                 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
137                 nextPartToSend++;
138                 return part;
139         }
140
141
142         public void setServerFailure() {
143                 hadServerFailure = true;
144         }
145
146         public bool getServerFailure() {
147                 return hadServerFailure;
148         }
149
150
151         public void resetServerFailure() {
152                 hadServerFailure = false;
153         }
154
155
156         public void setTransactionStatus(TransactionStatus _transactionStatus) {
157                 transactionStatus = _transactionStatus;
158         }
159
160         public TransactionStatus getTransactionStatus() {
161                 return transactionStatus;
162         }
163
164         public void removeSentParts(Vector<int32_t> sentParts) {
165                 nextPartToSend = 0;
166                 if (partsPendingSend.removeAll(sentParts))
167                 {
168                         didSendAPartToServer = true;
169                         transactionStatus.setTransactionSequenceNumber(sequenceNumber);
170                 }
171         }
172
173         public bool didSendAllParts() {
174                 return partsPendingSend.isEmpty();
175         }
176
177         public Set<KeyValue> getKeyValueUpdateSet() {
178                 return keyValueUpdateSet;
179         }
180
181         public int getNumberOfParts() {
182                 return parts.size();
183         }
184
185         public int64_t getMachineId() {
186                 return machineId;
187         }
188
189         public int64_t getArbitrator() {
190                 return arbitratorId;
191         }
192
193         public bool isComplete() {
194                 return isComplete;
195         }
196
197         public Pair<int64_t, int64_t> getId() {
198                 return transactionId;
199         }
200
201         public void setDead() {
202                 if (isDead) {
203                         // Already dead
204                         return;
205                 }
206
207                 // Set dead
208                 isDead = true;
209
210                 // Make all the parts of this transaction dead
211                 for (int32_t partNumber : parts.keySet()) {
212                         TransactionPart part = parts.get(partNumber);
213                         part.setDead();
214                 }
215         }
216
217         public TransactionPart getPart(int index) {
218                 return parts.get(index);
219         }
220
221         private void decodeTransactionData() {
222
223                 // Calculate the size of the data section
224                 int dataSize = 0;
225                 for (int i = 0; i < parts.keySet().size(); i++) {
226                         TransactionPart tp = parts.get(i);
227                         dataSize += tp.getDataSize();
228                 }
229
230                 char[] combinedData = new char[dataSize];
231                 int currentPosition = 0;
232
233                 // Stitch all the data sections together
234                 for (int i = 0; i < parts.keySet().size(); i++) {
235                         TransactionPart tp = parts.get(i);
236                         System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
237                         currentPosition += tp.getDataSize();
238                 }
239
240                 // Decoder Object
241                 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
242
243                 // Decode how many key value pairs need to be decoded
244                 int numberOfKVGuards = bbDecode.getInt();
245                 int numberOfKVUpdates = bbDecode.getInt();
246
247                 // Decode all the guard key values
248                 for (int i = 0; i < numberOfKVGuards; i++) {
249                         KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
250                         keyValueGuardSet.add(kv);
251                 }
252
253                 // Decode all the updates key values
254                 for (int i = 0; i < numberOfKVUpdates; i++) {
255                         KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
256                         keyValueUpdateSet.add(kv);
257                 }
258         }
259
260         public bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
261                 for (KeyValue kvGuard : keyValueGuardSet) {
262
263                         // First check if the key is in the speculative table, this is the value of the latest assumption
264                         KeyValue kv = NULL;
265
266                         // If we have a speculation table then use it first
267                         if (pendingTransactionSpeculatedKeyValueTable != NULL) {
268                                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
269                         }
270
271                         // If we have a speculation table then use it first
272                         if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
273                                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
274                         }
275
276                         if (kv == NULL) {
277                                 // if it is not in the speculative table then check the committed table and use that
278                                 // value as our latest assumption
279                                 kv = committedKeyValueTable.get(kvGuard.getKey());
280                         }
281
282                         if (kvGuard.getValue() != NULL) {
283                                 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
284
285
286                                         if (kv != NULL) {
287                                                 System.out.println(kvGuard.getValue() + "       " + kv.getValue());
288                                         } else {
289                                                 System.out.println(kvGuard.getValue() + "       " + kv);
290                                         }
291
292                                         return false;
293                                 }
294                         } else {
295                                 if (kv != NULL) {
296                                         return false;
297                                 }
298                         }
299                 }
300                 return true;
301         }
302 };
303 #endif