edits
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2 #include "TransactionPart.h"
3 #include "KeyValue.h"
4 #include "ByteBuffer.h"
5 #include "IoTString.h"
6 #include "TransactionStatus.h"
7
8 Transaction::Transaction() :
9         parts(new Vector<TransactionPart *>()),
10         partCount(0),
11         missingParts(NULL),
12         partsPendingSend(new Vector<int32_t>()),
13         fldisComplete(false),
14         hasLastPart(false),
15         keyValueGuardSet(new Hashset<KeyValue *>()),
16         keyValueUpdateSet(new Hashset<KeyValue *>()),
17         isDead(false),
18         sequenceNumber(-1),
19         clientLocalSequenceNumber(-1),
20         arbitratorId(-1),
21         machineId(-1),
22         transactionId(Pair<int64_t, int64_t>(0,0)),
23         nextPartToSend(0),
24         flddidSendAPartToServer(false),
25         transactionStatus(NULL),
26         hadServerFailure(false) {
27 }
28
29 Transaction::~Transaction() {
30         delete parts;
31         delete keyValueGuardSet;
32         delete keyValueUpdateSet;
33 }
34
35 void Transaction::addPartEncode(TransactionPart *newPart) {
36         TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
37         if (old == NULL)
38                 partCount++;
39         partsPendingSend->add(newPart->getPartNumber());
40
41         sequenceNumber = newPart->getSequenceNumber();
42         arbitratorId = newPart->getArbitratorId();
43         transactionId = newPart->getTransactionId();
44         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
45         machineId = newPart->getMachineId();
46         fldisComplete = true;
47 }
48
49 void Transaction::addPartDecode(TransactionPart *newPart) {
50         if (isDead) {
51                 // If dead then just kill this part and move on
52                 newPart->setDead();
53                 return;
54         }
55
56         sequenceNumber = newPart->getSequenceNumber();
57         arbitratorId = newPart->getArbitratorId();
58         transactionId = newPart->getTransactionId();
59         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
60         machineId = newPart->getMachineId();
61
62         TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
63         if (previouslySeenPart == NULL)
64                 partCount++;
65
66         if (previouslySeenPart != NULL) {
67                 // Set dead the old one since the new one is a rescued version of this part
68                 previouslySeenPart->setDead();
69         } else if (newPart->isLastPart()) {
70                 missingParts = new Hashset<int32_t>();
71                 hasLastPart = true;
72
73                 for (int i = 0; i < newPart->getPartNumber(); i++) {
74                         if (parts->get(i) == NULL) {
75                                 missingParts->add(i);
76                         }
77                 }
78         }
79
80         if (!fldisComplete && hasLastPart) {
81
82                 // We have seen this part so remove it from the set of missing parts
83                 missingParts->remove(newPart->getPartNumber());
84
85                 // Check if all the parts have been seen
86                 if (missingParts->size() == 0) {
87
88                         // We have all the parts
89                         fldisComplete = true;
90
91                         // Decode all the parts and create the key value guard and update sets
92                         decodeTransactionData();
93                 }
94         }
95 }
96
97 void Transaction::addUpdateKV(KeyValue *kv) {
98         keyValueUpdateSet->add(kv);
99 }
100
101 void Transaction::addGuardKV(KeyValue *kv) {
102         keyValueGuardSet->add(kv);
103 }
104
105
106 int64_t Transaction::getSequenceNumber() {
107         return sequenceNumber;
108 }
109
110 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
111         sequenceNumber = _sequenceNumber;
112
113         for (uint32_t i = 0; i < parts->size(); i++) {
114                 TransactionPart *tp = parts->get(i);
115                 if (tp != NULL)
116                         tp->setSequenceNumber(sequenceNumber);
117         }
118 }
119
120 int64_t Transaction::getClientLocalSequenceNumber() {
121         return clientLocalSequenceNumber;
122 }
123
124 Vector<TransactionPart *> *Transaction::getParts() {
125         return parts;
126 }
127
128 bool Transaction::didSendAPartToServer() {
129         return flddidSendAPartToServer;
130 }
131
132 void Transaction::resetNextPartToSend() {
133         nextPartToSend = 0;
134 }
135
136 TransactionPart *Transaction::getNextPartToSend() {
137         if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
138                 return NULL;
139         }
140         TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
141         nextPartToSend++;
142         return part;
143 }
144
145
146 void Transaction::setServerFailure() {
147         hadServerFailure = true;
148 }
149
150 bool Transaction::getServerFailure() {
151         return hadServerFailure;
152 }
153
154
155 void Transaction::resetServerFailure() {
156         hadServerFailure = false;
157 }
158
159
160 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
161         transactionStatus = _transactionStatus;
162 }
163
164 TransactionStatus *Transaction::getTransactionStatus() {
165         return transactionStatus;
166 }
167
168 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
169         nextPartToSend = 0;
170         bool changed = false;
171         uint lastusedindex = 0;
172         for (uint i = 0; i < partsPendingSend->size(); i++) {
173                 int32_t parti = partsPendingSend->get(i);
174                 for (uint j = 0; j < sentParts->size(); j++) {
175                         int32_t partj = sentParts->get(j);
176                         if (parti == partj) {
177                                 changed = true;
178                                 goto NextElement;
179                         }
180                 }
181                 partsPendingSend->set(lastusedindex++, parti);
182 NextElement:
183                 ;
184         }
185         if (changed) {
186                 partsPendingSend->setSize(lastusedindex);
187                 flddidSendAPartToServer = true;
188                 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
189         }
190 }
191
192 bool Transaction::didSendAllParts() {
193         return partsPendingSend->isEmpty();
194 }
195
196 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
197         return keyValueUpdateSet;
198 }
199
200 int Transaction::getNumberOfParts() {
201         return partCount;
202 }
203
204 int64_t Transaction::getMachineId() {
205         return machineId;
206 }
207
208 int64_t Transaction::getArbitrator() {
209         return arbitratorId;
210 }
211
212 bool Transaction::isComplete() {
213         return fldisComplete;
214 }
215
216 Pair<int64_t, int64_t> *Transaction::getId() {
217         return &transactionId;
218 }
219
220 void Transaction::setDead() {
221         if (!isDead) {
222                 // Set dead
223                 isDead = true;
224                 // Make all the parts of this transaction dead
225                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
226                         TransactionPart *part = parts->get(partNumber);
227                         if (part != NULL)
228                                 part->setDead();
229                 }
230         }
231 }
232
233 TransactionPart *Transaction::getPart(int index) {
234         return parts->get(index);
235 }
236
237 void Transaction::decodeTransactionData() {
238         // Calculate the size of the data section
239         int dataSize = 0;
240         for (uint i = 0; i < parts->size(); i++) {
241                 TransactionPart *tp = parts->get(i);
242                 dataSize += tp->getDataSize();
243         }
244
245         Array<char> *combinedData = new Array<char>(dataSize);
246         int currentPosition = 0;
247
248         // Stitch all the data sections together
249         for (uint i = 0; i < parts->size(); i++) {
250                 TransactionPart *tp = parts->get(i);
251                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
252                 currentPosition += tp->getDataSize();
253         }
254
255         // Decoder Object
256         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
257
258         // Decode how many key value pairs need to be decoded
259         int numberOfKVGuards = bbDecode->getInt();
260         int numberOfKVUpdates = bbDecode->getInt();
261
262         // Decode all the guard key values
263         for (int i = 0; i < numberOfKVGuards; i++) {
264                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
265                 keyValueGuardSet->add(kv);
266         }
267
268         // Decode all the updates key values
269         for (int i = 0; i < numberOfKVUpdates; i++) {
270                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
271                 keyValueUpdateSet->add(kv);
272         }
273 }
274
275 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
276         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
277         while (kvit->hasNext()) {
278                 KeyValue *kvGuard = kvit->next();
279                 // First check if the key is in the speculative table, this is the value of the latest assumption
280                 KeyValue *kv = NULL;
281
282                 // If we have a speculation table then use it first
283                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
284                         kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
285                 }
286
287                 // If we have a speculation table then use it first
288                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
289                         kv = speculatedKeyValueTable->get(kvGuard->getKey());
290                 }
291
292                 if (kv == NULL) {
293                         // if it is not in the speculative table then check the committed table and use that
294                         // value as our latest assumption
295                         kv = committedKeyValueTable->get(kvGuard->getKey());
296                 }
297
298                 if (kvGuard->getValue() != NULL) {
299                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
300
301
302                                 if (kv != NULL) {
303                                         printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
304                                 } else {
305                                         printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
306                                 }
307                                 delete kvit;
308                                 return false;
309                         }
310                 } else {
311                         if (kv != NULL) {
312                                 delete kvit;
313                                 return false;
314                         }
315                 }
316         }
317         delete kvit;
318         return true;
319 }
320