edits
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2
3 Transaction::Transaction() :
4         parts(new Hashtable<int32_t, TransactionPart>()),
5         missingParts(NULL),
6         partsPendingSend(new Vector<int32_t>()),
7         fldisComplete(false),
8         hasLastPart(false),
9         keyValueGuardSet(new HashSet<KeyValue>()),
10         keyValueUpdateSet(new HashSet<KeyValue>()),
11         isDead(false),
12         sequenceNumber(-1),
13         clientLocalSequenceNumber(-1),
14         arbitratorId(-1),
15         machineId(-1),
16         transactionId(NULL),
17         hadServerFailure(false) {
18 }
19
20 void Transaction::addPartEncode(TransactionPart *newPart) {
21         parts.put(newPart.getPartNumber(), newPart);
22         partsPendingSend.add(newPart.getPartNumber());
23
24         sequenceNumber = newPart.getSequenceNumber();
25         arbitratorId = newPart.getArbitratorId();
26         transactionId = newPart.getTransactionId();
27         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
28         machineId = newPart.getMachineId();
29
30         fldisComplete = true;
31 }
32
33 void Transaction::addPartDecode(TransactionPart *newPart) {
34         if (isDead) {
35                 // If dead then just kill this part and move on
36                 newPart.setDead();
37                 return;
38         }
39
40         sequenceNumber = newPart.getSequenceNumber();
41         arbitratorId = newPart.getArbitratorId();
42         transactionId = newPart.getTransactionId();
43         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
44         machineId = newPart.getMachineId();
45
46         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
47
48         if (previoslySeenPart != NULL) {
49                 // Set dead the old one since the new one is a rescued version of this part
50                 previoslySeenPart.setDead();
51         } else if (newPart.isLastPart()) {
52                 missingParts = new HashSet<int32_t>();
53                 hasLastPart = true;
54
55                 for (int i = 0; i < newPart.getPartNumber(); i++) {
56                         if (parts.get(i) == NULL) {
57                                 missingParts.add(i);
58                         }
59                 }
60         }
61
62         if (!fldisComplete && hasLastPart) {
63
64                 // We have seen this part so remove it from the set of missing parts
65                 missingParts.remove(newPart.getPartNumber());
66
67                 // Check if all the parts have been seen
68                 if (missingParts.size() == 0) {
69
70                         // We have all the parts
71                         fldisComplete = true;
72
73                         // Decode all the parts and create the key value guard and update sets
74                         decodeTransactionData();
75                 }
76         }
77 }
78
79 void Transaction::addUpdateKV(KeyValue *kv) {
80         keyValueUpdateSet.add(kv);
81 }
82
83 void Transaction::addGuardKV(KeyValue *kv) {
84         keyValueGuardSet.add(kv);
85 }
86
87
88 int64_t Transaction::getSequenceNumber() {
89         return sequenceNumber;
90 }
91
92 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
93         sequenceNumber = _sequenceNumber;
94
95         for (int32_t i : parts.keySet()) {
96                 parts.get(i).setSequenceNumber(sequenceNumber);
97         }
98 }
99
100 int64_t Transaction::getClientLocalSequenceNumber() {
101         return clientLocalSequenceNumber;
102 }
103
104 Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
105         return parts;
106 }
107
108 bool Transaction::didSendAPartToServer() {
109         return flddidSendAPartToServer;
110 }
111
112 void Transaction::resetNextPartToSend() {
113         nextPartToSend = 0;
114 }
115
116 TransactionPart *Transaction::getNextPartToSend() {
117         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
118                 return NULL;
119         }
120         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
121         nextPartToSend++;
122         return part;
123 }
124
125
126 void Transaction::setServerFailure() {
127         hadServerFailure = true;
128 }
129
130 bool Transaction::getServerFailure() {
131         return hadServerFailure;
132 }
133
134
135 void Transaction::resetServerFailure() {
136         hadServerFailure = false;
137 }
138
139
140 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
141         transactionStatus = _transactionStatus;
142 }
143
144 TransactionStatus *Transaction::getTransactionStatus() {
145         return transactionStatus;
146 }
147
148 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
149         nextPartToSend = 0;
150         if (partsPendingSend.removeAll(sentParts))
151         {
152                 flddidSendAPartToServer = true;
153                 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
154         }
155 }
156
157 bool Transaction::didSendAllParts() {
158         return partsPendingSend.isEmpty();
159 }
160
161 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
162         return keyValueUpdateSet;
163 }
164
165 int Transaction::getNumberOfParts() {
166         return parts.size();
167 }
168
169 int64_t Transaction::getMachineId() {
170         return machineId;
171 }
172
173 int64_t Transaction::getArbitrator() {
174         return arbitratorId;
175 }
176
177 bool Transaction::isComplete() {
178         return fldisComplete;
179 }
180
181 Pair<int64_t, int64_t> *Transaction::getId() {
182         return transactionId;
183 }
184
185 void Transaction::setDead() {
186         if (isDead) {
187                 // Already dead
188                 return;
189         }
190
191         // Set dead
192         isDead = true;
193
194         // Make all the parts of this transaction dead
195         for (int32_t partNumber : parts.keySet()) {
196                 TransactionPart part = parts.get(partNumber);
197                 part.setDead();
198         }
199 }
200
201 TransactionPart *Transaction::getPart(int index) {
202         return parts.get(index);
203 }
204
205 void Transaction::decodeTransactionData() {
206
207         // Calculate the size of the data section
208         int dataSize = 0;
209         for (int i = 0; i < parts.keySet().size(); i++) {
210                 TransactionPart tp = parts.get(i);
211                 dataSize += tp.getDataSize();
212         }
213
214         Array<char> *combinedData = new char[dataSize];
215         int currentPosition = 0;
216
217         // Stitch all the data sections together
218         for (int i = 0; i < parts.keySet().size(); i++) {
219                 TransactionPart tp = parts.get(i);
220                 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
221                 currentPosition += tp.getDataSize();
222         }
223
224         // Decoder Object
225         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
226
227         // Decode how many key value pairs need to be decoded
228         int numberOfKVGuards = bbDecode.getInt();
229         int numberOfKVUpdates = bbDecode.getInt();
230
231         // Decode all the guard key values
232         for (int i = 0; i < numberOfKVGuards; i++) {
233                 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
234                 keyValueGuardSet.add(kv);
235         }
236
237         // Decode all the updates key values
238         for (int i = 0; i < numberOfKVUpdates; i++) {
239                 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
240                 keyValueUpdateSet.add(kv);
241         }
242 }
243
244 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
245         for (KeyValue *kvGuard : keyValueGuardSet) {
246
247                 // First check if the key is in the speculative table, this is the value of the latest assumption
248                 KeyValue kv = NULL;
249
250                 // If we have a speculation table then use it first
251                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
252                         kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
253                 }
254
255                 // If we have a speculation table then use it first
256                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
257                         kv = speculatedKeyValueTable.get(kvGuard.getKey());
258                 }
259
260                 if (kv == NULL) {
261                         // if it is not in the speculative table then check the committed table and use that
262                         // value as our latest assumption
263                         kv = committedKeyValueTable.get(kvGuard.getKey());
264                 }
265
266                 if (kvGuard.getValue() != NULL) {
267                         if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
268
269
270                                 if (kv != NULL) {
271                                         System.out.println(kvGuard.getValue() + "       " + kv.getValue());
272                                 } else {
273                                         System.out.println(kvGuard.getValue() + "       " + kv);
274                                 }
275
276                                 return false;
277                         }
278                 } else {
279                         if (kv != NULL) {
280                                 return false;
281                         }
282                 }
283         }
284         return true;
285 }
286