526c0ba8dba74862cdf0ba608863eabc8818ebe0
[iotcloud.git] / version2 / src / C / Transaction.cc
1 #include "Transaction.h"
2
3 Transaction::Transaction() {
4         parts = new Hashtable<int32_t, TransactionPart>();
5         keyValueGuardSet = new HashSet<KeyValue>();
6         keyValueUpdateSet = new HashSet<KeyValue>();
7         partsPendingSend = new Vector<int32_t>();
8 }
9
10 void Transaction::addPartEncode(TransactionPart *newPart) {
11         parts.put(newPart.getPartNumber(), newPart);
12         partsPendingSend.add(newPart.getPartNumber());
13
14         sequenceNumber = newPart.getSequenceNumber();
15         arbitratorId = newPart.getArbitratorId();
16         transactionId = newPart.getTransactionId();
17         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
18         machineId = newPart.getMachineId();
19
20         isComplete = true;
21 }
22
23 void Transaction::addPartDecode(TransactionPart *newPart) {
24         if (isDead) {
25                 // If dead then just kill this part and move on
26                 newPart.setDead();
27                 return;
28         }
29
30         sequenceNumber = newPart.getSequenceNumber();
31         arbitratorId = newPart.getArbitratorId();
32         transactionId = newPart.getTransactionId();
33         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
34         machineId = newPart.getMachineId();
35
36         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
37
38         if (previoslySeenPart != NULL) {
39                 // Set dead the old one since the new one is a rescued version of this part
40                 previoslySeenPart.setDead();
41         } else if (newPart.isLastPart()) {
42                 missingParts = new HashSet<int32_t>();
43                 hasLastPart = true;
44
45                 for (int i = 0; i < newPart.getPartNumber(); i++) {
46                         if (parts.get(i) == NULL) {
47                                 missingParts.add(i);
48                         }
49                 }
50         }
51
52         if (!isComplete && hasLastPart) {
53
54                 // We have seen this part so remove it from the set of missing parts
55                 missingParts.remove(newPart.getPartNumber());
56
57                 // Check if all the parts have been seen
58                 if (missingParts.size() == 0) {
59
60                         // We have all the parts
61                         isComplete = true;
62
63                         // Decode all the parts and create the key value guard and update sets
64                         decodeTransactionData();
65                 }
66         }
67 }
68
69 void Transaction::addUpdateKV(KeyValue *kv) {
70         keyValueUpdateSet.add(kv);
71 }
72
73 void Transaction::addGuardKV(KeyValue *kv) {
74         keyValueGuardSet.add(kv);
75 }
76
77
78 int64_t Transaction::getSequenceNumber() {
79         return sequenceNumber;
80 }
81
82 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
83         sequenceNumber = _sequenceNumber;
84
85         for (int32_t i : parts.keySet()) {
86                 parts.get(i).setSequenceNumber(sequenceNumber);
87         }
88 }
89
90 int64_t Transaction::getClientLocalSequenceNumber() {
91         return clientLocalSequenceNumber;
92 }
93
94 Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
95         return parts;
96 }
97
98 bool Transaction::didSendAPartToServer() {
99         return didSendAPartToServer;
100 }
101
102 void Transaction::resetNextPartToSend() {
103         nextPartToSend = 0;
104 }
105
106 TransactionPart *Transaction::getNextPartToSend() {
107         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
108                 return NULL;
109         }
110         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
111         nextPartToSend++;
112         return part;
113 }
114
115
116 void Transaction::setServerFailure() {
117         hadServerFailure = true;
118 }
119
120 bool Transaction::getServerFailure() {
121         return hadServerFailure;
122 }
123
124
125 void Transaction::resetServerFailure() {
126         hadServerFailure = false;
127 }
128
129
130 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
131         transactionStatus = _transactionStatus;
132 }
133
134 TransactionStatus *Transaction::getTransactionStatus() {
135         return transactionStatus;
136 }
137
138 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
139         nextPartToSend = 0;
140         if (partsPendingSend.removeAll(sentParts))
141         {
142                 didSendAPartToServer = true;
143                 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
144         }
145 }
146
147 bool Transaction::didSendAllParts() {
148         return partsPendingSend.isEmpty();
149 }
150
151 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
152         return keyValueUpdateSet;
153 }
154
155 int Transaction::getNumberOfParts() {
156         return parts.size();
157 }
158
159 int64_t Transaction::getMachineId() {
160         return machineId;
161 }
162
163 int64_t Transaction::getArbitrator() {
164         return arbitratorId;
165 }
166
167 bool Transaction::isComplete() {
168         return isComplete;
169 }
170
171 Pair<int64_t, int64_t> *Transaction::getId() {
172         return transactionId;
173 }
174
175 void Transaction::setDead() {
176         if (isDead) {
177                 // Already dead
178                 return;
179         }
180
181         // Set dead
182         isDead = true;
183
184         // Make all the parts of this transaction dead
185         for (int32_t partNumber : parts.keySet()) {
186                 TransactionPart part = parts.get(partNumber);
187                 part.setDead();
188         }
189 }
190
191 TransactionPart *Transaction::getPart(int index) {
192         return parts.get(index);
193 }
194
195 void Transaction::decodeTransactionData() {
196
197         // Calculate the size of the data section
198         int dataSize = 0;
199         for (int i = 0; i < parts.keySet().size(); i++) {
200                 TransactionPart tp = parts.get(i);
201                 dataSize += tp.getDataSize();
202         }
203
204         Array<char> *combinedData = new char[dataSize];
205         int currentPosition = 0;
206
207         // Stitch all the data sections together
208         for (int i = 0; i < parts.keySet().size(); i++) {
209                 TransactionPart tp = parts.get(i);
210                 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
211                 currentPosition += tp.getDataSize();
212         }
213
214         // Decoder Object
215         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
216
217         // Decode how many key value pairs need to be decoded
218         int numberOfKVGuards = bbDecode.getInt();
219         int numberOfKVUpdates = bbDecode.getInt();
220
221         // Decode all the guard key values
222         for (int i = 0; i < numberOfKVGuards; i++) {
223                 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
224                 keyValueGuardSet.add(kv);
225         }
226
227         // Decode all the updates key values
228         for (int i = 0; i < numberOfKVUpdates; i++) {
229                 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
230                 keyValueUpdateSet.add(kv);
231         }
232 }
233
234 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
235         for (KeyValue *kvGuard : keyValueGuardSet) {
236
237                 // First check if the key is in the speculative table, this is the value of the latest assumption
238                 KeyValue kv = NULL;
239
240                 // If we have a speculation table then use it first
241                 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
242                         kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
243                 }
244
245                 // If we have a speculation table then use it first
246                 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
247                         kv = speculatedKeyValueTable.get(kvGuard.getKey());
248                 }
249
250                 if (kv == NULL) {
251                         // if it is not in the speculative table then check the committed table and use that
252                         // value as our latest assumption
253                         kv = committedKeyValueTable.get(kvGuard.getKey());
254                 }
255
256                 if (kvGuard.getValue() != NULL) {
257                         if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
258
259
260                                 if (kv != NULL) {
261                                         System.out.println(kvGuard.getValue() + "       " + kv.getValue());
262                                 } else {
263                                         System.out.println(kvGuard.getValue() + "       " + kv);
264                                 }
265
266                                 return false;
267                         }
268                 } else {
269                         if (kv != NULL) {
270                                 return false;
271                         }
272                 }
273         }
274         return true;
275 }
276