Edit
[iotcloud.git] / version2 / src / C / Transaction.cc
1
2
3 class Transaction {
4
5     Map<Integer, TransactionPart> parts = NULL;
6     Set<Integer> missingParts = NULL;
7     List<Integer> partsPendingSend = NULL;
8     bool isComplete = false;
9     bool hasLastPart = false;
10     Set<KeyValue> keyValueGuardSet = NULL;
11     Set<KeyValue> keyValueUpdateSet = NULL;
12     bool isDead = false;
13     int64_t sequenceNumber = -1;
14     int64_t clientLocalSequenceNumber = -1;
15     int64_t arbitratorId = -1;
16     int64_t machineId = -1;
17     Pair<Long, Long> transactionId = NULL;
18
19     int nextPartToSend = 0;
20     bool didSendAPartToServer = false;
21
22     TransactionStatus transactionStatus = NULL;
23
24     bool hadServerFailure = false;
25
26     Transaction() {
27         parts = new HashMap<Integer, TransactionPart>();
28         keyValueGuardSet = new HashSet<KeyValue>();
29         keyValueUpdateSet = new HashSet<KeyValue>();
30         partsPendingSend = new ArrayList<Integer>();
31     }
32
33     void addPartEncode(TransactionPart newPart) {
34         parts.put(newPart.getPartNumber(), newPart);
35         partsPendingSend.add(newPart.getPartNumber());
36
37         sequenceNumber = newPart.getSequenceNumber();
38         arbitratorId = newPart.getArbitratorId();
39         transactionId = newPart.getTransactionId();
40         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
41         machineId = newPart.getMachineId();
42
43         isComplete = true;
44     }
45
46     void addPartDecode(TransactionPart newPart) {
47
48         if (isDead) {
49             // If dead then just kill this part and move on
50             newPart.setDead();
51             return;
52         }
53
54         sequenceNumber = newPart.getSequenceNumber();
55         arbitratorId = newPart.getArbitratorId();
56         transactionId = newPart.getTransactionId();
57         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
58         machineId = newPart.getMachineId();
59
60         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
61
62         if (previoslySeenPart != NULL) {
63             // Set dead the old one since the new one is a rescued version of this part
64             previoslySeenPart.setDead();
65         } else if (newPart.isLastPart()) {
66             missingParts = new HashSet<Integer>();
67             hasLastPart = true;
68
69             for (int i = 0; i < newPart.getPartNumber(); i++) {
70                 if (parts.get(i) == NULL) {
71                     missingParts.add(i);
72                 }
73             }
74         }
75
76         if (!isComplete && hasLastPart) {
77
78             // We have seen this part so remove it from the set of missing parts
79             missingParts.remove(newPart.getPartNumber());
80
81             // Check if all the parts have been seen
82             if (missingParts.size() == 0) {
83
84                 // We have all the parts
85                 isComplete = true;
86
87                 // Decode all the parts and create the key value guard and update sets
88                 decodeTransactionData();
89             }
90         }
91     }
92
93     void addUpdateKV(KeyValue kv) {
94         keyValueUpdateSet.add(kv);
95     }
96
97     void addGuardKV(KeyValue kv) {
98         keyValueGuardSet.add(kv);
99     }
100
101
102     int64_t getSequenceNumber() {
103         return sequenceNumber;
104     }
105
106     void setSequenceNumber(int64_t _sequenceNumber) {
107         sequenceNumber = _sequenceNumber;
108
109         for (Integer i : parts.keySet()) {
110             parts.get(i).setSequenceNumber(sequenceNumber);
111         }
112     }
113
114     int64_t getClientLocalSequenceNumber() {
115         return clientLocalSequenceNumber;
116     }
117
118     Map<Integer, TransactionPart> getParts() {
119         return parts;
120     }
121
122     bool didSendAPartToServer() {
123         return didSendAPartToServer;
124     }
125
126     void resetNextPartToSend() {
127         nextPartToSend = 0;
128     }
129
130     TransactionPart getNextPartToSend() {
131         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
132             return NULL;
133         }
134         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
135         nextPartToSend++;
136         return part;
137     }
138
139
140     void setServerFailure() {
141         hadServerFailure = true;
142     }
143
144     bool getServerFailure() {
145         return hadServerFailure;
146     }
147
148
149     void resetServerFailure() {
150         hadServerFailure = false;
151     }
152
153
154     void setTransactionStatus(TransactionStatus _transactionStatus) {
155         transactionStatus = _transactionStatus;
156     }
157
158     TransactionStatus getTransactionStatus() {
159         return transactionStatus;
160     }
161
162     void removeSentParts(List<Integer> sentParts) {
163         nextPartToSend = 0;
164         if(partsPendingSend.removeAll(sentParts))
165         {
166             didSendAPartToServer = true;
167             transactionStatus.setTransactionSequenceNumber(sequenceNumber);
168         }
169     }
170
171     bool didSendAllParts() {
172         return partsPendingSend.isEmpty();
173     }
174
175     Set<KeyValue> getKeyValueUpdateSet() {
176         return keyValueUpdateSet;
177     }
178
179     int getNumberOfParts() {
180         return parts.size();
181     }
182
183     int64_t getMachineId() {
184         return machineId;
185     }
186
187     int64_t getArbitrator() {
188         return arbitratorId;
189     }
190
191     bool isComplete() {
192         return isComplete;
193     }
194
195     Pair<Long, Long> getId() {
196         return transactionId;
197     }
198
199     void setDead() {
200         if (isDead) {
201             // Already dead
202             return;
203         }
204
205         // Set dead
206         isDead = true;
207
208         // Make all the parts of this transaction dead
209         for (Integer partNumber : parts.keySet()) {
210             TransactionPart part = parts.get(partNumber);
211             part.setDead();
212         }
213     }
214
215     TransactionPart getPart(int index) {
216         return parts.get(index);
217     }
218
219     void decodeTransactionData() {
220
221         // Calculate the size of the data section
222         int dataSize = 0;
223         for (int i = 0; i < parts.keySet().size(); i++) {
224             TransactionPart tp = parts.get(i);
225             dataSize += tp.getDataSize();
226         }
227
228         char[] combinedData = new char[dataSize];
229         int currentPosition = 0;
230
231         // Stitch all the data sections together
232         for (int i = 0; i < parts.keySet().size(); i++) {
233             TransactionPart tp = parts.get(i);
234             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
235             currentPosition += tp.getDataSize();
236         }
237
238         // Decoder Object
239         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
240
241         // Decode how many key value pairs need to be decoded
242         int numberOfKVGuards = bbDecode.getInt();
243         int numberOfKVUpdates = bbDecode.getInt();
244
245         // Decode all the guard key values
246         for (int i = 0; i < numberOfKVGuards; i++) {
247             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
248             keyValueGuardSet.add(kv);
249         }
250
251         // Decode all the updates key values
252         for (int i = 0; i < numberOfKVUpdates; i++) {
253             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
254             keyValueUpdateSet.add(kv);
255         }
256     }
257
258     bool evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
259         for (KeyValue kvGuard : keyValueGuardSet) {
260
261             // First check if the key is in the speculative table, this is the value of the latest assumption
262             KeyValue kv = NULL;
263
264             // If we have a speculation table then use it first
265             if (pendingTransactionSpeculatedKeyValueTable != NULL) {
266                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
267             }
268
269             // If we have a speculation table then use it first
270             if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
271                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
272             }
273
274             if (kv == NULL) {
275                 // if it is not in the speculative table then check the committed table and use that
276                 // value as our latest assumption
277                 kv = committedKeyValueTable.get(kvGuard.getKey());
278             }
279
280             if (kvGuard.getValue() != NULL) {
281                 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
282
283
284                     if (kv != NULL) {
285                         System.out.println(kvGuard.getValue() + "       " + kv.getValue());
286                     } else {
287                         System.out.println(kvGuard.getValue() + "       " + kv);
288                     }
289
290                     return false;
291                 }
292             } else {
293                 if (kv != NULL) {
294                     return false;
295                 }
296             }
297         }
298         return true;
299     }
300 }