rename files
[iotcloud.git] / version2 / src / C / Transaction.cpp
1 #include "Transaction.h"
2 #include "TransactionPart.h"
3 #include "KeyValue.h"
4 #include "ByteBuffer.h"
5 #include "IoTString.h"
6 #include "TransactionStatus.h"
7
8 Transaction::Transaction() :
9         parts(new Vector<TransactionPart *>()),
10         partCount(0),
11         missingParts(NULL),
12         partsPendingSend(new Vector<int32_t>()),
13         fldisComplete(false),
14         hasLastPart(false),
15         keyValueGuardSet(new Hashset<KeyValue *>()),
16         keyValueUpdateSet(new Hashset<KeyValue *>()),
17         isDead(false),
18         sequenceNumber(-1),
19         clientLocalSequenceNumber(-1),
20         arbitratorId(-1),
21         machineId(-1),
22         transactionId(Pair<int64_t, int64_t>(0,0)),
23         nextPartToSend(0),
24         flddidSendAPartToServer(false),
25         transactionStatus(NULL),
26         hadServerFailure(false) {
27 }
28
29 Transaction::~Transaction() {
30         if (missingParts)
31                 delete missingParts;
32         {
33                 uint Size = parts->size();
34                 for(uint i=0; i<Size; i++)
35                         parts->get(i)->releaseRef();
36                 delete parts;
37         }
38         {
39                 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
40                 while (kvit->hasNext()) {
41                         KeyValue *kvGuard = kvit->next();
42                         delete kvGuard;
43                 }
44                 delete kvit;
45                 delete keyValueGuardSet;
46         }
47         {
48                 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
49                 while (kvit->hasNext()) {
50                         KeyValue *kvUpdate = kvit->next();
51                         delete kvUpdate;
52                 }
53                 delete kvit;
54                 delete keyValueUpdateSet;
55         }
56         delete partsPendingSend;
57 }
58
59 void Transaction::addPartEncode(TransactionPart *newPart) {
60         newPart->acquireRef();
61         printf("Add part %d\n", newPart->getPartNumber());
62         TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
63         if (old == NULL) {
64                 partCount++;
65         } else {
66                 old->releaseRef();
67         }
68         partsPendingSend->add(newPart->getPartNumber());
69
70         sequenceNumber = newPart->getSequenceNumber();
71         arbitratorId = newPart->getArbitratorId();
72         transactionId = newPart->getTransactionId();
73         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
74         machineId = newPart->getMachineId();
75         fldisComplete = true;
76 }
77
78 void Transaction::addPartDecode(TransactionPart *newPart) {
79         if (isDead) {
80                 // If dead then just kill this part and move on
81                 newPart->setDead();
82                 return;
83         }
84         newPart->acquireRef();
85         sequenceNumber = newPart->getSequenceNumber();
86         arbitratorId = newPart->getArbitratorId();
87         transactionId = newPart->getTransactionId();
88         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
89         machineId = newPart->getMachineId();
90
91         TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
92         if (previouslySeenPart == NULL)
93                 partCount++;
94
95         if (previouslySeenPart != NULL) {
96                 // Set dead the old one since the new one is a rescued version of this part
97                 previouslySeenPart->releaseRef();
98                 previouslySeenPart->setDead();
99         } else if (newPart->isLastPart()) {
100                 missingParts = new Hashset<int32_t>();
101                 hasLastPart = true;
102
103                 for (int i = 0; i < newPart->getPartNumber(); i++) {
104                         if (parts->get(i) == NULL) {
105                                 missingParts->add(i);
106                         }
107                 }
108         }
109
110         if (!fldisComplete && hasLastPart) {
111
112                 // We have seen this part so remove it from the set of missing parts
113                 missingParts->remove(newPart->getPartNumber());
114
115                 // Check if all the parts have been seen
116                 if (missingParts->size() == 0) {
117
118                         // We have all the parts
119                         fldisComplete = true;
120
121                         // Decode all the parts and create the key value guard and update sets
122                         decodeTransactionData();
123                 }
124         }
125 }
126
127 void Transaction::addUpdateKV(KeyValue *kv) {
128         keyValueUpdateSet->add(kv);
129 }
130
131 void Transaction::addGuardKV(KeyValue *kv) {
132         keyValueGuardSet->add(kv);
133 }
134
135
136 int64_t Transaction::getSequenceNumber() {
137         return sequenceNumber;
138 }
139
140 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
141         sequenceNumber = _sequenceNumber;
142
143         for (uint32_t i = 0; i < parts->size(); i++) {
144                 TransactionPart *tp = parts->get(i);
145                 if (tp != NULL)
146                         tp->setSequenceNumber(sequenceNumber);
147         }
148 }
149
150 int64_t Transaction::getClientLocalSequenceNumber() {
151         return clientLocalSequenceNumber;
152 }
153
154 Vector<TransactionPart *> *Transaction::getParts() {
155         return parts;
156 }
157
158 bool Transaction::didSendAPartToServer() {
159         return flddidSendAPartToServer;
160 }
161
162 void Transaction::resetNextPartToSend() {
163         nextPartToSend = 0;
164 }
165
166 TransactionPart *Transaction::getNextPartToSend() {
167         if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
168                 return NULL;
169         }
170         TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
171         nextPartToSend++;
172         return part;
173 }
174
175
176 void Transaction::setServerFailure() {
177         hadServerFailure = true;
178 }
179
180 bool Transaction::getServerFailure() {
181         return hadServerFailure;
182 }
183
184
185 void Transaction::resetServerFailure() {
186         hadServerFailure = false;
187 }
188
189
190 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
191         transactionStatus = _transactionStatus;
192 }
193
194 TransactionStatus *Transaction::getTransactionStatus() {
195         return transactionStatus;
196 }
197
198 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
199         nextPartToSend = 0;
200         bool changed = false;
201         uint lastusedindex = 0;
202         for (uint i = 0; i < partsPendingSend->size(); i++) {
203                 int32_t parti = partsPendingSend->get(i);
204                 for (uint j = 0; j < sentParts->size(); j++) {
205                         int32_t partj = sentParts->get(j);
206                         if (parti == partj) {
207                                 changed = true;
208                                 goto NextElement;
209                         }
210                 }
211                 partsPendingSend->set(lastusedindex++, parti);
212 NextElement:
213                 ;
214         }
215         if (changed) {
216                 partsPendingSend->setSize(lastusedindex);
217                 flddidSendAPartToServer = true;
218                 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
219         }
220 }
221
222 bool Transaction::didSendAllParts() {
223         return partsPendingSend->isEmpty();
224 }
225
226 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
227         return keyValueUpdateSet;
228 }
229
230 int Transaction::getNumberOfParts() {
231         return partCount;
232 }
233
234 int64_t Transaction::getMachineId() {
235         return machineId;
236 }
237
238 int64_t Transaction::getArbitrator() {
239         return arbitratorId;
240 }
241
242 bool Transaction::isComplete() {
243         return fldisComplete;
244 }
245
246 Pair<int64_t, int64_t> *Transaction::getId() {
247         return &transactionId;
248 }
249
250 void Transaction::setDead() {
251         if (!isDead) {
252                 // Set dead
253                 isDead = true;
254                 // Make all the parts of this transaction dead
255                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
256                         TransactionPart *part = parts->get(partNumber);
257                         if (part != NULL)
258                                 part->setDead();
259                 }
260         }
261 }
262
263 void Transaction::decodeTransactionData() {
264         // Calculate the size of the data section
265         int dataSize = 0;
266         for (uint i = 0; i < parts->size(); i++) {
267                 TransactionPart *tp = parts->get(i);
268                 dataSize += tp->getDataSize();
269         }
270
271         Array<char> *combinedData = new Array<char>(dataSize);
272         int currentPosition = 0;
273
274         // Stitch all the data sections together
275         for (uint i = 0; i < parts->size(); i++) {
276                 TransactionPart *tp = parts->get(i);
277                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
278                 currentPosition += tp->getDataSize();
279         }
280
281         // Decoder Object
282         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
283
284         // Decode how many key value pairs need to be decoded
285         int numberOfKVGuards = bbDecode->getInt();
286         int numberOfKVUpdates = bbDecode->getInt();
287
288         // Decode all the guard key values
289         for (int i = 0; i < numberOfKVGuards; i++) {
290                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
291                 keyValueGuardSet->add(kv);
292         }
293
294         // Decode all the updates key values
295         for (int i = 0; i < numberOfKVUpdates; i++) {
296                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
297                 keyValueUpdateSet->add(kv);
298         }
299         delete bbDecode;
300 }
301
302 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
303         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
304         while (kvit->hasNext()) {
305                 KeyValue *kvGuard = kvit->next();
306                 // First check if the key is in the speculative table, this is the value of the latest assumption
307                 KeyValue *kv = NULL;
308
309                 // If we have a speculation table then use it first
310                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
311                         kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
312                 }
313
314                 // If we have a speculation table then use it first
315                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
316                         kv = speculatedKeyValueTable->get(kvGuard->getKey());
317                 }
318
319                 if (kv == NULL) {
320                         // if it is not in the speculative table then check the committed table and use that
321                         // value as our latest assumption
322                         kv = committedKeyValueTable->get(kvGuard->getKey());
323                 }
324
325                 if (kvGuard->getValue() != NULL) {
326                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
327
328
329                                 if (kv != NULL) {
330                                         printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
331                                 } else {
332                                         printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
333                                 }
334                                 delete kvit;
335                                 return false;
336                         }
337                 } else {
338                         if (kv != NULL) {
339                                 delete kvit;
340                                 return false;
341                         }
342                 }
343         }
344         delete kvit;
345         return true;
346 }
347