8dba5a3ad99fa71045bbc1141d2a73ace5f14083
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2 #include "TransactionPart.h"
3 #include "KeyValue.h"
4 #include "ByteBuffer.h"
5 #include "IoTString.h"
6 #include "TransactionStatus.h"
7
8 Transaction::Transaction() :
9         parts(new Hashtable<int32_t, TransactionPart *>()),
10         missingParts(NULL),
11         partsPendingSend(new Vector<int32_t>()),
12         fldisComplete(false),
13         hasLastPart(false),
14         keyValueGuardSet(new Hashset<KeyValue *>()),
15         keyValueUpdateSet(new Hashset<KeyValue *>()),
16         isDead(false),
17         sequenceNumber(-1),
18         clientLocalSequenceNumber(-1),
19         arbitratorId(-1),
20         machineId(-1),
21         transactionId(NULL),
22         hadServerFailure(false) {
23 }
24
25 void Transaction::addPartEncode(TransactionPart *newPart) {
26         parts->put(newPart->getPartNumber(), newPart);
27         partsPendingSend->add(newPart->getPartNumber());
28
29         sequenceNumber = newPart->getSequenceNumber();
30         arbitratorId = newPart->getArbitratorId();
31         transactionId = newPart->getTransactionId();
32         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
33         machineId = newPart->getMachineId();
34
35         fldisComplete = true;
36 }
37
38 void Transaction::addPartDecode(TransactionPart *newPart) {
39         if (isDead) {
40                 // If dead then just kill this part and move on
41                 newPart->setDead();
42                 return;
43         }
44
45         sequenceNumber = newPart->getSequenceNumber();
46         arbitratorId = newPart->getArbitratorId();
47         transactionId = newPart->getTransactionId();
48         clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
49         machineId = newPart->getMachineId();
50
51         TransactionPart * previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
52
53         if (previoslySeenPart != NULL) {
54                 // Set dead the old one since the new one is a rescued version of this part
55                 previoslySeenPart->setDead();
56         } else if (newPart->isLastPart()) {
57                 missingParts = new Hashset<int32_t>();
58                 hasLastPart = true;
59
60                 for (int i = 0; i < newPart->getPartNumber(); i++) {
61                         if (parts->get(i) == NULL) {
62                                 missingParts->add(i);
63                         }
64                 }
65         }
66
67         if (!fldisComplete && hasLastPart) {
68
69                 // We have seen this part so remove it from the set of missing parts
70                 missingParts->remove(newPart->getPartNumber());
71
72                 // Check if all the parts have been seen
73                 if (missingParts->size() == 0) {
74
75                         // We have all the parts
76                         fldisComplete = true;
77
78                         // Decode all the parts and create the key value guard and update sets
79                         decodeTransactionData();
80                 }
81         }
82 }
83
84 void Transaction::addUpdateKV(KeyValue *kv) {
85         keyValueUpdateSet->add(kv);
86 }
87
88 void Transaction::addGuardKV(KeyValue *kv) {
89         keyValueGuardSet->add(kv);
90 }
91
92
93 int64_t Transaction::getSequenceNumber() {
94         return sequenceNumber;
95 }
96
97 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
98         sequenceNumber = _sequenceNumber;
99
100         for (int32_t i : parts->keySet()) {
101                 parts->get(i)->setSequenceNumber(sequenceNumber);
102         }
103 }
104
105 int64_t Transaction::getClientLocalSequenceNumber() {
106         return clientLocalSequenceNumber;
107 }
108
109 Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
110         return parts;
111 }
112
113 bool Transaction::didSendAPartToServer() {
114         return flddidSendAPartToServer;
115 }
116
117 void Transaction::resetNextPartToSend() {
118         nextPartToSend = 0;
119 }
120
121 TransactionPart *Transaction::getNextPartToSend() {
122         if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
123                 return NULL;
124         }
125         TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
126         nextPartToSend++;
127         return part;
128 }
129
130
131 void Transaction::setServerFailure() {
132         hadServerFailure = true;
133 }
134
135 bool Transaction::getServerFailure() {
136         return hadServerFailure;
137 }
138
139
140 void Transaction::resetServerFailure() {
141         hadServerFailure = false;
142 }
143
144
145 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
146         transactionStatus = _transactionStatus;
147 }
148
149 TransactionStatus *Transaction::getTransactionStatus() {
150         return transactionStatus;
151 }
152
153 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
154         nextPartToSend = 0;
155         if (partsPendingSend->removeAll(sentParts))
156         {
157                 flddidSendAPartToServer = true;
158                 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
159         }
160 }
161
162 bool Transaction::didSendAllParts() {
163         return partsPendingSend->isEmpty();
164 }
165
166 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
167         return keyValueUpdateSet;
168 }
169
170 int Transaction::getNumberOfParts() {
171         return parts->size();
172 }
173
174 int64_t Transaction::getMachineId() {
175         return machineId;
176 }
177
178 int64_t Transaction::getArbitrator() {
179         return arbitratorId;
180 }
181
182 bool Transaction::isComplete() {
183         return fldisComplete;
184 }
185
186 Pair<int64_t, int64_t> *Transaction::getId() {
187         return transactionId;
188 }
189
190 void Transaction::setDead() {
191         if (isDead) {
192                 // Already dead
193                 return;
194         }
195
196         // Set dead
197         isDead = true;
198
199         // Make all the parts of this transaction dead
200         for (int32_t partNumber : parts->keySet()) {
201                 TransactionPart* part = parts->get(partNumber);
202                 part->setDead();
203         }
204 }
205
206 TransactionPart *Transaction::getPart(int index) {
207         return parts->get(index);
208 }
209
210 void Transaction::decodeTransactionData() {
211
212         // Calculate the size of the data section
213         int dataSize = 0;
214         for (int i = 0; i < parts->keySet()->size(); i++) {
215                 TransactionPart *tp = parts->get(i);
216                 dataSize += tp->getDataSize();
217         }
218
219         Array<char> *combinedData = new Array<char>(dataSize);
220         int currentPosition = 0;
221
222         // Stitch all the data sections together
223         for (int i = 0; i < parts->keySet()->size(); i++) {
224                 TransactionPart *tp = parts->get(i);
225                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
226                 currentPosition += tp->getDataSize();
227         }
228
229         // Decoder Object
230         ByteBuffer* bbDecode = ByteBuffer_wrap(combinedData);
231
232         // Decode how many key value pairs need to be decoded
233         int numberOfKVGuards = bbDecode->getInt();
234         int numberOfKVUpdates = bbDecode->getInt();
235
236         // Decode all the guard key values
237         for (int i = 0; i < numberOfKVGuards; i++) {
238                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
239                 keyValueGuardSet->add(kv);
240         }
241
242         // Decode all the updates key values
243         for (int i = 0; i < numberOfKVUpdates; i++) {
244                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
245                 keyValueUpdateSet->add(kv);
246         }
247 }
248
249 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
250         for (KeyValue *kvGuard : keyValueGuardSet) {
251
252                 // First check if the key is in the speculative table, this is the value of the latest assumption
253                 KeyValue *kv = NULL;
254
255                 // If we have a speculation table then use it first
256                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
257                         kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
258                 }
259
260                 // If we have a speculation table then use it first
261                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
262                         kv = speculatedKeyValueTable->get(kvGuard->getKey());
263                 }
264
265                 if (kv == NULL) {
266                         // if it is not in the speculative table then check the committed table and use that
267                         // value as our latest assumption
268                         kv = committedKeyValueTable->get(kvGuard->getKey());
269                 }
270
271                 if (kvGuard->getValue() != NULL) {
272                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
273
274
275                                 if (kv != NULL) {
276                                         printf("%s      %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
277                                 } else {
278                                         printf("%s      null\n", kvGuard->getValue()->internalBytes()->internalArray());
279                                 }
280
281                                 return false;
282                         }
283                 } else {
284                         if (kv != NULL) {
285                                 return false;
286                         }
287                 }
288         }
289         return true;
290 }
291