edits
[iotcloud.git] / version2 / src / C / Transaction.cc
1
2
3 class Transaction {
4
5         Hashtable<int32_t, TransactionPart> parts = NULL;
6         Set<int32_t> missingParts = NULL;
7         Vector<int32_t> partsPendingSend = NULL;
8         bool isComplete = false;
9         bool hasLastPart = false;
10         Set<KeyValue> keyValueGuardSet = NULL;
11         Set<KeyValue> keyValueUpdateSet = NULL;
12         bool isDead = false;
13         int64_t sequenceNumber = -1;
14         int64_t clientLocalSequenceNumber = -1;
15         int64_t arbitratorId = -1;
16         int64_t machineId = -1;
17         Pair<int64_t, int64_t> transactionId = NULL;
18
19         int nextPartToSend = 0;
20         bool didSendAPartToServer = false;
21
22         TransactionStatus transactionStatus = NULL;
23
24         bool hadServerFailure = false;
25
26         Transaction() {
27                 parts = new Hashtable<int32_t, TransactionPart>();
28                 keyValueGuardSet = new HashSet<KeyValue>();
29                 keyValueUpdateSet = new HashSet<KeyValue>();
30                 partsPendingSend = new Vector<int32_t>();
31         }
32
33         void addPartEncode(TransactionPart newPart) {
34                 parts.put(newPart.getPartNumber(), newPart);
35                 partsPendingSend.add(newPart.getPartNumber());
36
37                 sequenceNumber = newPart.getSequenceNumber();
38                 arbitratorId = newPart.getArbitratorId();
39                 transactionId = newPart.getTransactionId();
40                 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
41                 machineId = newPart.getMachineId();
42
43                 isComplete = true;
44         }
45
46         void addPartDecode(TransactionPart newPart) {
47
48                 if (isDead) {
49                         // If dead then just kill this part and move on
50                         newPart.setDead();
51                         return;
52                 }
53
54                 sequenceNumber = newPart.getSequenceNumber();
55                 arbitratorId = newPart.getArbitratorId();
56                 transactionId = newPart.getTransactionId();
57                 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
58                 machineId = newPart.getMachineId();
59
60                 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
61
62                 if (previoslySeenPart != NULL) {
63                         // Set dead the old one since the new one is a rescued version of this part
64                         previoslySeenPart.setDead();
65                 } else if (newPart.isLastPart()) {
66                         missingParts = new HashSet<int32_t>();
67                         hasLastPart = true;
68
69                         for (int i = 0; i < newPart.getPartNumber(); i++) {
70                                 if (parts.get(i) == NULL) {
71                                         missingParts.add(i);
72                                 }
73                         }
74                 }
75
76                 if (!isComplete && hasLastPart) {
77
78                         // We have seen this part so remove it from the set of missing parts
79                         missingParts.remove(newPart.getPartNumber());
80
81                         // Check if all the parts have been seen
82                         if (missingParts.size() == 0) {
83
84                                 // We have all the parts
85                                 isComplete = true;
86
87                                 // Decode all the parts and create the key value guard and update sets
88                                 decodeTransactionData();
89                         }
90                 }
91         }
92
93         void addUpdateKV(KeyValue kv) {
94                 keyValueUpdateSet.add(kv);
95         }
96
97         void addGuardKV(KeyValue kv) {
98                 keyValueGuardSet.add(kv);
99         }
100
101
102         int64_t getSequenceNumber() {
103                 return sequenceNumber;
104         }
105
106         void setSequenceNumber(int64_t _sequenceNumber) {
107                 sequenceNumber = _sequenceNumber;
108
109                 for (int32_t i : parts.keySet()) {
110                         parts.get(i).setSequenceNumber(sequenceNumber);
111                 }
112         }
113
114         int64_t getClientLocalSequenceNumber() {
115                 return clientLocalSequenceNumber;
116         }
117
118         Hashtable<int32_t, TransactionPart> getParts() {
119                 return parts;
120         }
121
122         bool didSendAPartToServer() {
123                 return didSendAPartToServer;
124         }
125
126         void resetNextPartToSend() {
127                 nextPartToSend = 0;
128         }
129
130         TransactionPart getNextPartToSend() {
131                 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
132                         return NULL;
133                 }
134                 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
135                 nextPartToSend++;
136                 return part;
137         }
138
139
140         void setServerFailure() {
141                 hadServerFailure = true;
142         }
143
144         bool getServerFailure() {
145                 return hadServerFailure;
146         }
147
148
149         void resetServerFailure() {
150                 hadServerFailure = false;
151         }
152
153
154         void setTransactionStatus(TransactionStatus _transactionStatus) {
155                 transactionStatus = _transactionStatus;
156         }
157
158         TransactionStatus getTransactionStatus() {
159                 return transactionStatus;
160         }
161
162         void removeSentParts(Vector<int32_t> sentParts) {
163                 nextPartToSend = 0;
164                 if (partsPendingSend.removeAll(sentParts))
165                 {
166                         didSendAPartToServer = true;
167                         transactionStatus.setTransactionSequenceNumber(sequenceNumber);
168                 }
169         }
170
171         bool didSendAllParts() {
172                 return partsPendingSend.isEmpty();
173         }
174
175         Set<KeyValue> getKeyValueUpdateSet() {
176                 return keyValueUpdateSet;
177         }
178
179         int getNumberOfParts() {
180                 return parts.size();
181         }
182
183         int64_t getMachineId() {
184                 return machineId;
185         }
186
187         int64_t getArbitrator() {
188                 return arbitratorId;
189         }
190
191         bool isComplete() {
192                 return isComplete;
193         }
194
195         Pair<int64_t, int64_t> getId() {
196                 return transactionId;
197         }
198
199         void setDead() {
200                 if (isDead) {
201                         // Already dead
202                         return;
203                 }
204
205                 // Set dead
206                 isDead = true;
207
208                 // Make all the parts of this transaction dead
209                 for (int32_t partNumber : parts.keySet()) {
210                         TransactionPart part = parts.get(partNumber);
211                         part.setDead();
212                 }
213         }
214
215         TransactionPart getPart(int index) {
216                 return parts.get(index);
217         }
218
219         void decodeTransactionData() {
220
221                 // Calculate the size of the data section
222                 int dataSize = 0;
223                 for (int i = 0; i < parts.keySet().size(); i++) {
224                         TransactionPart tp = parts.get(i);
225                         dataSize += tp.getDataSize();
226                 }
227
228                 Array<char> *combinedData = new char[dataSize];
229                 int currentPosition = 0;
230
231                 // Stitch all the data sections together
232                 for (int i = 0; i < parts.keySet().size(); i++) {
233                         TransactionPart tp = parts.get(i);
234                         System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
235                         currentPosition += tp.getDataSize();
236                 }
237
238                 // Decoder Object
239                 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
240
241                 // Decode how many key value pairs need to be decoded
242                 int numberOfKVGuards = bbDecode.getInt();
243                 int numberOfKVUpdates = bbDecode.getInt();
244
245                 // Decode all the guard key values
246                 for (int i = 0; i < numberOfKVGuards; i++) {
247                         KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
248                         keyValueGuardSet.add(kv);
249                 }
250
251                 // Decode all the updates key values
252                 for (int i = 0; i < numberOfKVUpdates; i++) {
253                         KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
254                         keyValueUpdateSet.add(kv);
255                 }
256         }
257
258         bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
259                 for (KeyValue kvGuard : keyValueGuardSet) {
260
261                         // First check if the key is in the speculative table, this is the value of the latest assumption
262                         KeyValue kv = NULL;
263
264                         // If we have a speculation table then use it first
265                         if (pendingTransactionSpeculatedKeyValueTable != NULL) {
266                                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
267                         }
268
269                         // If we have a speculation table then use it first
270                         if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
271                                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
272                         }
273
274                         if (kv == NULL) {
275                                 // if it is not in the speculative table then check the committed table and use that
276                                 // value as our latest assumption
277                                 kv = committedKeyValueTable.get(kvGuard.getKey());
278                         }
279
280                         if (kvGuard.getValue() != NULL) {
281                                 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
282
283
284                                         if (kv != NULL) {
285                                                 System.out.println(kvGuard.getValue() + "       " + kv.getValue());
286                                         } else {
287                                                 System.out.println(kvGuard.getValue() + "       " + kv);
288                                         }
289
290                                         return false;
291                                 }
292                         } else {
293                                 if (kv != NULL) {
294                                         return false;
295                                 }
296                         }
297                 }
298                 return true;
299         }
300 }