Another file
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2 #include "TransactionPart.h"
3 #include "KeyValue.h"
4 #include "ByteBuffer.h"
5 #include "IoTString.h"
6 #include "TransactionStatus.h"
7
8 Transaction::Transaction() :
9         parts(new Vector<TransactionPart *>()),
10         partCount(0),
11         missingParts(NULL),
12         partsPendingSend(new Vector<int32_t>()),
13         fldisComplete(false),
14         hasLastPart(false),
15         keyValueGuardSet(new Hashset<KeyValue *>()),
16         keyValueUpdateSet(new Hashset<KeyValue *>()),
17         isDead(false),
18         sequenceNumber(-1),
19         clientLocalSequenceNumber(-1),
20         arbitratorId(-1),
21         machineId(-1),
22         transactionId(Pair<int64_t, int64_t>(0,0)),
23         hadServerFailure(false) {
24 }
25
26 void Transaction::addPartEncode(TransactionPart *newPart) {
27         TransactionPart * old=parts->setExpand(newPart->getPartNumber(), newPart);
28         if (old == NULL)
29                 partCount++;
30         partsPendingSend->add(newPart->getPartNumber());
31
32         sequenceNumber = newPart->getSequenceNumber();
33         arbitratorId = newPart->getArbitratorId();
34         transactionId = newPart->getTransactionId();
35         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
36         machineId = newPart->getMachineId();
37
38         fldisComplete = true;
39 }
40
41 void Transaction::addPartDecode(TransactionPart *newPart) {
42         if (isDead) {
43                 // If dead then just kill this part and move on
44                 newPart->setDead();
45                 return;
46         }
47
48         sequenceNumber = newPart->getSequenceNumber();
49         arbitratorId = newPart->getArbitratorId();
50         transactionId = newPart->getTransactionId();
51         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
52         machineId = newPart->getMachineId();
53
54         TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
55         if (previouslySeenPart == NULL)
56                 partCount++;
57         
58         if (previouslySeenPart != NULL) {
59                 // Set dead the old one since the new one is a rescued version of this part
60                 previouslySeenPart->setDead();
61         } else if (newPart->isLastPart()) {
62                 missingParts = new Hashset<int32_t>();
63                 hasLastPart = true;
64
65                 for (int i = 0; i < newPart->getPartNumber(); i++) {
66                         if (parts->get(i) == NULL) {
67                                 missingParts->add(i);
68                         }
69                 }
70         }
71
72         if (!fldisComplete && hasLastPart) {
73
74                 // We have seen this part so remove it from the set of missing parts
75                 missingParts->remove(newPart->getPartNumber());
76
77                 // Check if all the parts have been seen
78                 if (missingParts->size() == 0) {
79
80                         // We have all the parts
81                         fldisComplete = true;
82
83                         // Decode all the parts and create the key value guard and update sets
84                         decodeTransactionData();
85                 }
86         }
87 }
88
89 void Transaction::addUpdateKV(KeyValue *kv) {
90         keyValueUpdateSet->add(kv);
91 }
92
93 void Transaction::addGuardKV(KeyValue *kv) {
94         keyValueGuardSet->add(kv);
95 }
96
97
98 int64_t Transaction::getSequenceNumber() {
99         return sequenceNumber;
100 }
101
102 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
103         sequenceNumber = _sequenceNumber;
104         
105         for (int32_t i = 0; i < parts->size(); i++) {
106                 TransactionPart * tp = parts->get(i);
107                 if (tp !=NULL)
108                         tp->setSequenceNumber(sequenceNumber);
109         }
110 }
111
112 int64_t Transaction::getClientLocalSequenceNumber() {
113         return clientLocalSequenceNumber;
114 }
115
116 Vector<TransactionPart *> *Transaction::getParts() {
117         return parts;
118 }
119
120 bool Transaction::didSendAPartToServer() {
121         return flddidSendAPartToServer;
122 }
123
124 void Transaction::resetNextPartToSend() {
125         nextPartToSend = 0;
126 }
127
128 TransactionPart *Transaction::getNextPartToSend() {
129         if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
130                 return NULL;
131         }
132         TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
133         nextPartToSend++;
134         return part;
135 }
136
137
138 void Transaction::setServerFailure() {
139         hadServerFailure = true;
140 }
141
142 bool Transaction::getServerFailure() {
143         return hadServerFailure;
144 }
145
146
147 void Transaction::resetServerFailure() {
148         hadServerFailure = false;
149 }
150
151
152 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
153         transactionStatus = _transactionStatus;
154 }
155
156 TransactionStatus *Transaction::getTransactionStatus() {
157         return transactionStatus;
158 }
159
160 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
161         nextPartToSend = 0;
162         bool changed = false;
163         uint lastusedindex = 0;
164         for(uint i=0; i < partsPendingSend->size(); i++) {
165                 int32_t parti = partsPendingSend->get(i);
166                 for(uint j=0; j < sentParts->size(); j++) {
167                         int32_t partj = sentParts->get(j);
168                         if (parti == partj) {
169                                 changed = true;
170                                 goto NextElement;
171                         }
172                 }
173                 partsPendingSend->set(lastusedindex++, parti);
174         NextElement:
175                 ;
176         }
177         if (changed) {
178                 partsPendingSend->setSize(lastusedindex);
179                 flddidSendAPartToServer = true;
180                 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
181         }
182 }
183
184 bool Transaction::didSendAllParts() {
185         return partsPendingSend->isEmpty();
186 }
187
188 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
189         return keyValueUpdateSet;
190 }
191
192 int Transaction::getNumberOfParts() {
193         return partCount;
194 }
195
196 int64_t Transaction::getMachineId() {
197         return machineId;
198 }
199
200 int64_t Transaction::getArbitrator() {
201         return arbitratorId;
202 }
203
204 bool Transaction::isComplete() {
205         return fldisComplete;
206 }
207
208 Pair<int64_t, int64_t> Transaction::getId() {
209         return transactionId;
210 }
211
212 void Transaction::setDead() {
213         if (isDead) {
214                 // Already dead
215                 return;
216         }
217
218         // Set dead
219         isDead = true;
220
221         // Make all the parts of this transaction dead
222         for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
223                 TransactionPart *part = parts->get(partNumber);
224                 if (part != NULL)
225                         part->setDead();
226         }
227 }
228
229 TransactionPart *Transaction::getPart(int index) {
230         return parts->get(index);
231 }
232
233 void Transaction::decodeTransactionData() {
234         // Calculate the size of the data section
235         int dataSize = 0;
236         for (int i = 0; i < parts->size(); i++) {
237                 TransactionPart *tp = parts->get(i);
238                 dataSize += tp->getDataSize();
239         }
240
241         Array<char> *combinedData = new Array<char>(dataSize);
242         int currentPosition = 0;
243
244         // Stitch all the data sections together
245         for (int i = 0; i < parts->size(); i++) {
246                 TransactionPart *tp = parts->get(i);
247                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
248                 currentPosition += tp->getDataSize();
249         }
250
251         // Decoder Object
252         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
253
254         // Decode how many key value pairs need to be decoded
255         int numberOfKVGuards = bbDecode->getInt();
256         int numberOfKVUpdates = bbDecode->getInt();
257
258         // Decode all the guard key values
259         for (int i = 0; i < numberOfKVGuards; i++) {
260                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
261                 keyValueGuardSet->add(kv);
262         }
263
264         // Decode all the updates key values
265         for (int i = 0; i < numberOfKVUpdates; i++) {
266                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
267                 keyValueUpdateSet->add(kv);
268         }
269 }
270
271 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
272         SetIterator<KeyValue *>* kvit=keyValueGuardSet->iterator();
273         while(kvit->hasNext()) {
274                 KeyValue *kvGuard = kvit->next();
275                 // First check if the key is in the speculative table, this is the value of the latest assumption
276                 KeyValue *kv = NULL;
277
278                 // If we have a speculation table then use it first
279                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
280                         kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
281                 }
282
283                 // If we have a speculation table then use it first
284                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
285                         kv = speculatedKeyValueTable->get(kvGuard->getKey());
286                 }
287
288                 if (kv == NULL) {
289                         // if it is not in the speculative table then check the committed table and use that
290                         // value as our latest assumption
291                         kv = committedKeyValueTable->get(kvGuard->getKey());
292                 }
293
294                 if (kvGuard->getValue() != NULL) {
295                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
296
297
298                                 if (kv != NULL) {
299                                         printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
300                                 } else {
301                                         printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
302                                 }
303                                 delete kvit;
304                                 return false;
305                         }
306                 } else {
307                         if (kv != NULL) {
308                                 delete kvit;
309                                 return false;
310                         }
311                 }
312         }
313         delete kvit;
314         return true;
315 }
316