45602a0e9ae361a195bfbcb7e54c4ddc1375430f
[iotcloud.git] / version2 / src / C / PendingTransaction.cc
1 #include "PendingTransaction.h"
2 #include "KeyValue.h"
3 #include "IoTString.h"
4 #include "Transaction.h"
5 #include "TransactionPart.h"
6 #include "ByteBuffer.h"
7
8 PendingTransaction::PendingTransaction(int64_t _machineId) :
9         keyValueUpdateSet(new Hashset<KeyValue *>()),
10         keyValueGuardSet(new Hashset<KeyValue *>()),
11         arbitrator(-1),
12         clientLocalSequenceNumber(-1),
13         machineId(_machineId),
14         currentDataSize(0) {
15 }
16
17 PendingTransaction::~PendingTransaction() {
18         delete keyValueUpdateSet;
19         delete keyValueGuardSet;
20 }
21
22 /**
23  * Add a new key value to the updates
24  *
25  */
26 void PendingTransaction::addKV(KeyValue *newKV) {
27         KeyValue *rmKV = NULL;
28
29         // Make sure there are no duplicates
30         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
31         while (kvit->hasNext()) {
32                 KeyValue *kv = kvit->next();
33                 if (kv->getKey()->equals(newKV->getKey())) {
34                         // Remove key if we are adding a newer version of the same key
35                         rmKV = kv;
36                         break;
37                 }
38         }
39         delete kvit;
40
41         // Remove key if we are adding a newer version of the same key
42         if (rmKV != NULL) {
43                 keyValueUpdateSet->remove(rmKV);
44                 currentDataSize -= rmKV->getSize();
45         }
46
47         // Add the key to the hash set
48         keyValueUpdateSet->add(newKV);
49         currentDataSize += newKV->getSize();
50 }
51
52 /**
53  * Add a new key value to the guard set
54  *
55  */
56 void PendingTransaction::addKVGuard(KeyValue *newKV) {
57         // Add the key to the hash set
58         keyValueGuardSet->add(newKV);
59         currentDataSize += newKV->getSize();
60 }
61
62 /**
63  * Checks if the arbitrator is the same
64  */
65 bool PendingTransaction::checkArbitrator(int64_t arb) {
66         if (arbitrator == -1) {
67                 arbitrator = arb;
68                 return true;
69         }
70         return arb == arbitrator;
71 }
72
73 bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *keyValTableCommitted, Hashtable<IoTString *, KeyValue *> *keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> *keyValTablePendingTransSpeculative) {
74         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
75         while (kvit->hasNext()) {
76                 KeyValue *kvGuard = kvit->next();
77                 // First check if the key is in the speculative table, this is the
78                 // value of the latest assumption
79                 KeyValue *kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
80
81
82                 if (kv == NULL) {
83                         // if it is not in the pending trans table then check the
84                         // speculative table and use that value as our latest assumption
85                         kv = keyValTableSpeculative->get(kvGuard->getKey());
86                 }
87
88
89                 if (kv == NULL) {
90                         // if it is not in the speculative table then check the
91                         // committed table and use that value as our latest assumption
92                         kv = keyValTableCommitted->get(kvGuard->getKey());
93                 }
94
95                 if (kvGuard->getValue() != NULL) {
96                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
97                                 delete kvit;
98                                 return false;
99                         }
100                 } else {
101                         if (kv != NULL) {
102                                 delete kvit;
103                                 return false;
104                         }
105                 }
106         }
107         delete kvit;
108         return true;
109 }
110
111 Transaction *PendingTransaction::createTransaction() {
112         Transaction *newTransaction = new Transaction();
113         int transactionPartCount = 0;
114
115         // Convert all the data into a char array so we can start partitioning
116         Array<char> *charData = convertDataToBytes();
117
118         int currentPosition = 0;
119         for (int remaining = charData->length(); remaining > 0;) {
120                 bool isLastPart = false;
121                 // determine how much to copy
122                 int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
123                 if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
124                         copySize = remaining;
125                         isLastPart = true;//last bit of data so last part
126                 }
127
128                 // Copy to a smaller version
129                 Array<char> *partData = new Array<char>(copySize);
130                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
131
132                 TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
133                 newTransaction->addPartEncode(part);
134
135                 // Update position, count and remaining
136                 currentPosition += copySize;
137                 transactionPartCount++;
138                 remaining -= copySize;
139         }
140         delete charData;
141         
142         // Add the Guard Conditions
143         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
144         while (kvit->hasNext()) {
145                 KeyValue *kv = kvit->next();
146                 newTransaction->addGuardKV(kv);
147         }
148         delete kvit;
149
150         //  Add the updates
151         kvit = keyValueUpdateSet->iterator();
152         while (kvit->hasNext()) {
153                 KeyValue *kv = kvit->next();
154                 newTransaction->addUpdateKV(kv);
155         }
156         delete kvit;
157         return newTransaction;
158 }
159
160 Array<char> *PendingTransaction::convertDataToBytes() {
161         // Calculate the size of the data
162         int sizeOfData = 2 * sizeof(int32_t);   // Number of Update KV's and Guard KV's
163         sizeOfData += currentDataSize;
164
165         // Data handlers and storage
166         Array<char> *dataArray = new Array<char>(sizeOfData);
167         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
168
169         // Encode the size of the updates and guard sets
170         bbEncode->putInt(keyValueGuardSet->size());
171         bbEncode->putInt(keyValueUpdateSet->size());
172
173         // Encode all the guard conditions
174         SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
175         while (kvit->hasNext()) {
176                 KeyValue *kv = kvit->next();
177                 kv->encode(bbEncode);
178         }
179         delete kvit;
180
181         // Encode all the updates
182         kvit = keyValueUpdateSet->iterator();
183         while (kvit->hasNext()) {
184                 KeyValue *kv = kvit->next();
185                 kv->encode(bbEncode);
186         }
187         delete kvit;
188
189         Array<char> *array = bbEncode->array();
190         bbEncode->releaseArray();
191         delete bbEncode;
192         return array;
193 }