f5a5bfdc2c3ceef1402c5cc2e415648b83f29346
[iotcloud.git] / version2 / src / C / Transaction.h
1 #ifndef TRANSACTION_H
2 #define TRANSACTION_H
3 #include "common.h"
4
5 class Transaction {
6  private:
7         Hashtable<int32_t, TransactionPart *> parts = NULL;
8         Set<int32_t> missingParts = NULL;
9         List<int32_t> partsPendingSend = NULL;
10   bool isComplete = false;
11   bool hasLastPart = false;
12         Hashset<KeyValue *> * keyValueGuardSet = NULL;
13         Hashset<KeyValue *> * keyValueUpdateSet = NULL;
14         bool isDead = false;
15   int64_t sequenceNumber = -1;
16         int64_t clientLocalSequenceNumber = -1;
17         int64_t arbitratorId = -1;
18         int64_t machineId = -1;
19         Pair<uint64_t, uint64_t> transactionId = NULL;
20         
21   int nextPartToSend = 0;
22         bool didSendAPartToServer = false;
23         
24   TransactionStatus transactionStatus = NULL;
25
26  bool hadServerFailure = false;
27
28     public Transaction() {
29         parts = new HashMap<Integer, TransactionPart>();
30         keyValueGuardSet = new HashSet<KeyValue>();
31         keyValueUpdateSet = new HashSet<KeyValue>();
32         partsPendingSend = new ArrayList<Integer>();
33     }
34
35     public void addPartEncode(TransactionPart newPart) {
36         parts.put(newPart.getPartNumber(), newPart);
37         partsPendingSend.add(newPart.getPartNumber());
38
39         sequenceNumber = newPart.getSequenceNumber();
40         arbitratorId = newPart.getArbitratorId();
41         transactionId = newPart.getTransactionId();
42         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
43         machineId = newPart.getMachineId();
44
45         isComplete = true;
46     }
47
48     public void addPartDecode(TransactionPart newPart) {
49
50         if (isDead) {
51             // If dead then just kill this part and move on
52             newPart.setDead();
53             return;
54         }
55
56         sequenceNumber = newPart.getSequenceNumber();
57         arbitratorId = newPart.getArbitratorId();
58         transactionId = newPart.getTransactionId();
59         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
60         machineId = newPart.getMachineId();
61
62         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
63
64         if (previoslySeenPart != NULL) {
65             // Set dead the old one since the new one is a rescued version of this part
66             previoslySeenPart.setDead();
67         } else if (newPart.isLastPart()) {
68             missingParts = new HashSet<Integer>();
69             hasLastPart = true;
70
71             for (int i = 0; i < newPart.getPartNumber(); i++) {
72                 if (parts.get(i) == NULL) {
73                     missingParts.add(i);
74                 }
75             }
76         }
77
78         if (!isComplete && hasLastPart) {
79
80             // We have seen this part so remove it from the set of missing parts
81             missingParts.remove(newPart.getPartNumber());
82
83             // Check if all the parts have been seen
84             if (missingParts.size() == 0) {
85
86                 // We have all the parts
87                 isComplete = true;
88
89                 // Decode all the parts and create the key value guard and update sets
90                 decodeTransactionData();
91             }
92         }
93     }
94
95     public void addUpdateKV(KeyValue kv) {
96         keyValueUpdateSet.add(kv);
97     }
98
99     public void addGuardKV(KeyValue kv) {
100         keyValueGuardSet.add(kv);
101     }
102
103
104     public int64_t getSequenceNumber() {
105         return sequenceNumber;
106     }
107
108     public void setSequenceNumber(int64_t _sequenceNumber) {
109         sequenceNumber = _sequenceNumber;
110
111         for (Integer i : parts.keySet()) {
112             parts.get(i).setSequenceNumber(sequenceNumber);
113         }
114     }
115
116     public int64_t getClientLocalSequenceNumber() {
117         return clientLocalSequenceNumber;
118     }
119
120     public Map<Integer, TransactionPart> getParts() {
121         return parts;
122     }
123
124     public bool didSendAPartToServer() {
125         return didSendAPartToServer;
126     }
127
128     public void resetNextPartToSend() {
129         nextPartToSend = 0;
130     }
131
132     public TransactionPart getNextPartToSend() {
133         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
134             return NULL;
135         }
136         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
137         nextPartToSend++;
138         return part;
139     }
140
141
142     public void setServerFailure() {
143         hadServerFailure = true;
144     }
145
146     public bool getServerFailure() {
147         return hadServerFailure;
148     }
149
150
151     public void resetServerFailure() {
152         hadServerFailure = false;
153     }
154
155
156     public void setTransactionStatus(TransactionStatus _transactionStatus) {
157         transactionStatus = _transactionStatus;
158     }
159
160     public TransactionStatus getTransactionStatus() {
161         return transactionStatus;
162     }
163
164     public void removeSentParts(List<Integer> sentParts) {
165         nextPartToSend = 0;
166         if(partsPendingSend.removeAll(sentParts))
167         {
168             didSendAPartToServer = true;
169             transactionStatus.setTransactionSequenceNumber(sequenceNumber);
170         }
171     }
172
173     public bool didSendAllParts() {
174         return partsPendingSend.isEmpty();
175     }
176
177     public Set<KeyValue> getKeyValueUpdateSet() {
178         return keyValueUpdateSet;
179     }
180
181     public int getNumberOfParts() {
182         return parts.size();
183     }
184
185     public int64_t getMachineId() {
186         return machineId;
187     }
188
189     public int64_t getArbitrator() {
190         return arbitratorId;
191     }
192
193     public bool isComplete() {
194         return isComplete;
195     }
196
197     public Pair<int64_t, int64_t> getId() {
198         return transactionId;
199     }
200
201     public void setDead() {
202         if (isDead) {
203             // Already dead
204             return;
205         }
206
207         // Set dead
208         isDead = true;
209
210         // Make all the parts of this transaction dead
211         for (Integer partNumber : parts.keySet()) {
212             TransactionPart part = parts.get(partNumber);
213             part.setDead();
214         }
215     }
216
217     public TransactionPart getPart(int index) {
218         return parts.get(index);
219     }
220
221     private void decodeTransactionData() {
222
223         // Calculate the size of the data section
224         int dataSize = 0;
225         for (int i = 0; i < parts.keySet().size(); i++) {
226             TransactionPart tp = parts.get(i);
227             dataSize += tp.getDataSize();
228         }
229
230         char[] combinedData = new char[dataSize];
231         int currentPosition = 0;
232
233         // Stitch all the data sections together
234         for (int i = 0; i < parts.keySet().size(); i++) {
235             TransactionPart tp = parts.get(i);
236             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
237             currentPosition += tp.getDataSize();
238         }
239
240         // Decoder Object
241         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
242
243         // Decode how many key value pairs need to be decoded
244         int numberOfKVGuards = bbDecode.getInt();
245         int numberOfKVUpdates = bbDecode.getInt();
246
247         // Decode all the guard key values
248         for (int i = 0; i < numberOfKVGuards; i++) {
249             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
250             keyValueGuardSet.add(kv);
251         }
252
253         // Decode all the updates key values
254         for (int i = 0; i < numberOfKVUpdates; i++) {
255             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
256             keyValueUpdateSet.add(kv);
257         }
258     }
259
260     public bool evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
261         for (KeyValue kvGuard : keyValueGuardSet) {
262
263             // First check if the key is in the speculative table, this is the value of the latest assumption
264             KeyValue kv = NULL;
265
266             // If we have a speculation table then use it first
267             if (pendingTransactionSpeculatedKeyValueTable != NULL) {
268                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
269             }
270
271             // If we have a speculation table then use it first
272             if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
273                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
274             }
275
276             if (kv == NULL) {
277                 // if it is not in the speculative table then check the committed table and use that
278                 // value as our latest assumption
279                 kv = committedKeyValueTable.get(kvGuard.getKey());
280             }
281
282             if (kvGuard.getValue() != NULL) {
283                 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
284
285
286                     if (kv != NULL) {
287                         System.out.println(kvGuard.getValue() + "       " + kv.getValue());
288                     } else {
289                         System.out.println(kvGuard.getValue() + "       " + kv);
290                     }
291
292                     return false;
293                 }
294             } else {
295                 if (kv != NULL) {
296                     return false;
297                 }
298             }
299         }
300         return true;
301     }
302 };
303 #endif