d316aa10985723556b2c7c2ccc3c9ab10696d854
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2 #include "TransactionPart.h"
3 #include "KeyValue.h"
4 #include "ByteBuffer.h"
5 #include "IoTString.h"
6 #include "TransactionStatus.h"
7
8 Transaction::Transaction() :
9         parts(new Vector<TransactionPart *>()),
10         partCount(0),
11         missingParts(NULL),
12         partsPendingSend(new Vector<int32_t>()),
13         fldisComplete(false),
14         hasLastPart(false),
15         keyValueGuardSet(new Hashset<KeyValue *>()),
16         keyValueUpdateSet(new Hashset<KeyValue *>()),
17         isDead(false),
18         sequenceNumber(-1),
19         clientLocalSequenceNumber(-1),
20         arbitratorId(-1),
21         machineId(-1),
22         transactionId(Pair<int64_t, int64_t>(0,0)),
23         nextPartToSend(0),
24         flddidSendAPartToServer(false),
25         transactionStatus(NULL),
26         hadServerFailure(false) {
27 }
28
29 void Transaction::addPartEncode(TransactionPart *newPart) {
30         TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
31         if (old == NULL)
32                 partCount++;
33         partsPendingSend->add(newPart->getPartNumber());
34
35         sequenceNumber = newPart->getSequenceNumber();
36         arbitratorId = newPart->getArbitratorId();
37         transactionId = newPart->getTransactionId();
38         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
39         machineId = newPart->getMachineId();
40         fldisComplete = true;
41 }
42
43 void Transaction::addPartDecode(TransactionPart *newPart) {
44         if (isDead) {
45                 // If dead then just kill this part and move on
46                 newPart->setDead();
47                 return;
48         }
49
50         sequenceNumber = newPart->getSequenceNumber();
51         arbitratorId = newPart->getArbitratorId();
52         transactionId = newPart->getTransactionId();
53         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
54         machineId = newPart->getMachineId();
55
56         TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
57         if (previouslySeenPart == NULL)
58                 partCount++;
59
60         if (previouslySeenPart != NULL) {
61                 // Set dead the old one since the new one is a rescued version of this part
62                 previouslySeenPart->setDead();
63         } else if (newPart->isLastPart()) {
64                 missingParts = new Hashset<int32_t>();
65                 hasLastPart = true;
66
67                 for (int i = 0; i < newPart->getPartNumber(); i++) {
68                         if (parts->get(i) == NULL) {
69                                 missingParts->add(i);
70                         }
71                 }
72         }
73
74         if (!fldisComplete && hasLastPart) {
75
76                 // We have seen this part so remove it from the set of missing parts
77                 missingParts->remove(newPart->getPartNumber());
78
79                 // Check if all the parts have been seen
80                 if (missingParts->size() == 0) {
81
82                         // We have all the parts
83                         fldisComplete = true;
84
85                         // Decode all the parts and create the key value guard and update sets
86                         decodeTransactionData();
87                 }
88         }
89 }
90
91 void Transaction::addUpdateKV(KeyValue *kv) {
92         keyValueUpdateSet->add(kv);
93 }
94
95 void Transaction::addGuardKV(KeyValue *kv) {
96         keyValueGuardSet->add(kv);
97 }
98
99
100 int64_t Transaction::getSequenceNumber() {
101         return sequenceNumber;
102 }
103
104 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
105         sequenceNumber = _sequenceNumber;
106
107         for (uint32_t i = 0; i < parts->size(); i++) {
108                 TransactionPart *tp = parts->get(i);
109                 if (tp != NULL)
110                         tp->setSequenceNumber(sequenceNumber);
111         }
112 }
113
114 int64_t Transaction::getClientLocalSequenceNumber() {
115         return clientLocalSequenceNumber;
116 }
117
118 Vector<TransactionPart *> *Transaction::getParts() {
119         return parts;
120 }
121
122 bool Transaction::didSendAPartToServer() {
123         return flddidSendAPartToServer;
124 }
125
126 void Transaction::resetNextPartToSend() {
127         nextPartToSend = 0;
128 }
129
130 TransactionPart *Transaction::getNextPartToSend() {
131         if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
132                 return NULL;
133         }
134         TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
135         nextPartToSend++;
136         return part;
137 }
138
139
140 void Transaction::setServerFailure() {
141         hadServerFailure = true;
142 }
143
144 bool Transaction::getServerFailure() {
145         return hadServerFailure;
146 }
147
148
149 void Transaction::resetServerFailure() {
150         hadServerFailure = false;
151 }
152
153
154 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
155         transactionStatus = _transactionStatus;
156 }
157
158 TransactionStatus *Transaction::getTransactionStatus() {
159         return transactionStatus;
160 }
161
162 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
163         nextPartToSend = 0;
164         bool changed = false;
165         uint lastusedindex = 0;
166         for (uint i = 0; i < partsPendingSend->size(); i++) {
167                 int32_t parti = partsPendingSend->get(i);
168                 for (uint j = 0; j < sentParts->size(); j++) {
169                         int32_t partj = sentParts->get(j);
170                         if (parti == partj) {
171                                 changed = true;
172                                 goto NextElement;
173                         }
174                 }
175                 partsPendingSend->set(lastusedindex++, parti);
176 NextElement:
177                 ;
178         }
179         if (changed) {
180                 partsPendingSend->setSize(lastusedindex);
181                 flddidSendAPartToServer = true;
182                 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
183         }
184 }
185
186 bool Transaction::didSendAllParts() {
187         return partsPendingSend->isEmpty();
188 }
189
190 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
191         return keyValueUpdateSet;
192 }
193
194 int Transaction::getNumberOfParts() {
195         return partCount;
196 }
197
198 int64_t Transaction::getMachineId() {
199         return machineId;
200 }
201
202 int64_t Transaction::getArbitrator() {
203         return arbitratorId;
204 }
205
206 bool Transaction::isComplete() {
207         return fldisComplete;
208 }
209
210 Pair<int64_t, int64_t> *Transaction::getId() {
211         return &transactionId;
212 }
213
214 void Transaction::setDead() {
215         if (!isDead) {
216                 // Set dead
217                 isDead = true;
218                 // Make all the parts of this transaction dead
219                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
220                         TransactionPart *part = parts->get(partNumber);
221                         if (part != NULL)
222                                 part->setDead();
223                 }
224         }
225 }
226
227 TransactionPart *Transaction::getPart(int index) {
228         return parts->get(index);
229 }
230
231 void Transaction::decodeTransactionData() {
232         // Calculate the size of the data section
233         int dataSize = 0;
234         for (uint i = 0; i < parts->size(); i++) {
235                 TransactionPart *tp = parts->get(i);
236                 dataSize += tp->getDataSize();
237         }
238
239         Array<char> *combinedData = new Array<char>(dataSize);
240         int currentPosition = 0;
241
242         // Stitch all the data sections together
243         for (uint i = 0; i < parts->size(); i++) {
244                 TransactionPart *tp = parts->get(i);
245                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
246                 currentPosition += tp->getDataSize();
247         }
248
249         // Decoder Object
250         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
251
252         // Decode how many key value pairs need to be decoded
253         int numberOfKVGuards = bbDecode->getInt();
254         int numberOfKVUpdates = bbDecode->getInt();
255
256         // Decode all the guard key values
257         for (int i = 0; i < numberOfKVGuards; i++) {
258                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
259                 keyValueGuardSet->add(kv);
260         }
261
262         // Decode all the updates key values
263         for (int i = 0; i < numberOfKVUpdates; i++) {
264                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
265                 keyValueUpdateSet->add(kv);
266         }
267 }
268
269 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
270         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
271         while (kvit->hasNext()) {
272                 KeyValue *kvGuard = kvit->next();
273                 // First check if the key is in the speculative table, this is the value of the latest assumption
274                 KeyValue *kv = NULL;
275
276                 // If we have a speculation table then use it first
277                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
278                         kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
279                 }
280
281                 // If we have a speculation table then use it first
282                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
283                         kv = speculatedKeyValueTable->get(kvGuard->getKey());
284                 }
285
286                 if (kv == NULL) {
287                         // if it is not in the speculative table then check the committed table and use that
288                         // value as our latest assumption
289                         kv = committedKeyValueTable->get(kvGuard->getKey());
290                 }
291
292                 if (kvGuard->getValue() != NULL) {
293                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
294
295
296                                 if (kv != NULL) {
297                                         printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
298                                 } else {
299                                         printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
300                                 }
301                                 delete kvit;
302                                 return false;
303                         }
304                 } else {
305                         if (kv != NULL) {
306                                 delete kvit;
307                                 return false;
308                         }
309                 }
310         }
311         delete kvit;
312         return true;
313 }
314