84bef5ecaec59ff6ebc451291e2f8172be2f35de
[iotcloud.git] / version2 / src / C / PendingTransaction.cc
1 #include "PendingTransaction.h"
2 #include "KeyValue.h"
3 #include "IoTString.h"
4 #include "Transaction.h"
5 #include "TransactionPart.h"
6 #include "ByteBuffer.h"
7
8 PendingTransaction::PendingTransaction(int64_t _machineId) :
9         keyValueUpdateSet(new Hashset<KeyValue *>()),
10         keyValueGuardSet(new Hashset<KeyValue *>()),
11         arbitrator(-1),
12         clientLocalSequenceNumber(-1),
13         machineId(_machineId),
14         currentDataSize(0) {
15 }
16
17 /**
18  * Add a new key value to the updates
19  *
20  */
21 void PendingTransaction::addKV(KeyValue *newKV) {
22
23         KeyValue * rmKV = NULL;
24
25         // Make sure there are no duplicates
26         SetIterator<KeyValue *> * kvit = keyValueUpdateSet->iterator();
27         while(kvit->hasNext()) {
28                 KeyValue *kv = kvit->next();
29                 if (kv->getKey()->equals(newKV->getKey())) {
30
31                         // Remove key if we are adding a newer version of the same key
32                         rmKV = kv;
33                         break;
34                 }
35         }
36         delete kvit;
37         
38         // Remove key if we are adding a newer version of the same key
39         if (rmKV != NULL) {
40                 keyValueUpdateSet->remove(rmKV);
41                 currentDataSize -= rmKV->getSize();
42         }
43
44         // Add the key to the hash set
45         keyValueUpdateSet->add(newKV);
46         currentDataSize += newKV->getSize();
47 }
48
49 /**
50  * Add a new key value to the guard set
51  *
52  */
53 void PendingTransaction::addKVGuard(KeyValue *newKV) {
54         // Add the key to the hash set
55         keyValueGuardSet->add(newKV);
56         currentDataSize += newKV->getSize();
57 }
58
59 /**
60  * Checks if the arbitrator is the same
61  */
62 bool PendingTransaction::checkArbitrator(int64_t arb) {
63         if (arbitrator == -1) {
64                 arbitrator = arb;
65                 return true;
66         }
67
68         return arb == arbitrator;
69 }
70
71 bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> * keyValTableCommitted, Hashtable<IoTString *, KeyValue *> * keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> * keyValTablePendingTransSpeculative) {
72         SetIterator<KeyValue *> * kvit = keyValueGuardSet->iterator();
73         while(kvit->hasNext()) {
74                 KeyValue *kvGuard = kvit->next();
75
76                 // First check if the key is in the speculative table, this is the value of the latest assumption
77                 KeyValue * kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
78
79
80                 if (kv == NULL) {
81                         // if it is not in the pending trans table then check the speculative table and use that
82                         // value as our latest assumption
83                         kv = keyValTableSpeculative->get(kvGuard->getKey());
84                 }
85
86
87                 if (kv == NULL) {
88                         // if it is not in the speculative table then check the committed table and use that
89                         // value as our latest assumption
90                         kv = keyValTableCommitted->get(kvGuard->getKey());
91                 }
92
93                 if (kvGuard->getValue() != NULL) {
94                         if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
95                                 delete kvit;
96                                 return false;
97                         }
98                 } else {
99                         if (kv != NULL) {
100                                 delete kvit;
101                                 return false;
102                         }
103                 }
104         }
105         delete kvit;
106         return true;
107 }
108
109 Transaction *PendingTransaction::createTransaction() {
110         Transaction *newTransaction = new Transaction();
111         int transactionPartCount = 0;
112
113         // Convert all the data into a char array so we can start partitioning
114         Array<char> *charData = convertDataToBytes();
115
116         int currentPosition = 0;
117         int remaining = charData->length();
118
119         while (remaining > 0) {
120
121                 bool isLastPart = false;
122                 // determine how much to copy
123                 int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
124                 if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
125                         copySize = remaining;
126                         isLastPart = true;// last bit of data so last part
127                 }
128
129                 // Copy to a smaller version
130                 Array<char> *partData = new Array<char>(copySize);
131                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
132
133                 TransactionPart * part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
134                 newTransaction->addPartEncode(part);
135
136                 // Update position, count and remaining
137                 currentPosition += copySize;
138                 transactionPartCount++;
139                 remaining -= copySize;
140         }
141
142         // Add the Guard Conditions
143         SetIterator<KeyValue *> * kvit = keyValueGuardSet->iterator();
144         while(kvit->hasNext()) {
145                 KeyValue *kv = kvit->next();
146                 newTransaction->addGuardKV(kv);
147         }
148         delete kvit;
149         
150         //  Add the updates
151         kvit = keyValueUpdateSet->iterator();
152         while(kvit->hasNext()) {
153                 KeyValue *kv = kvit->next();
154                 newTransaction->addUpdateKV(kv);
155         }
156         delete kvit;
157         return newTransaction;
158 }
159
160 Array<char> *PendingTransaction::convertDataToBytes() {
161         // Calculate the size of the data
162         int sizeOfData = 2 * sizeof(int32_t);   // Number of Update KV's and Guard KV's
163         sizeOfData += currentDataSize;
164
165         // Data handlers and storage
166         Array<char> *dataArray = new Array<char>(sizeOfData);
167         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
168
169         // Encode the size of the updates and guard sets
170         bbEncode->putInt(keyValueGuardSet->size());
171         bbEncode->putInt(keyValueUpdateSet->size());
172
173         // Encode all the guard conditions
174         SetIterator<KeyValue *> * kvit = keyValueGuardSet->iterator();
175         while(kvit->hasNext()) {
176                 KeyValue *kv = kvit->next();
177                 kv->encode(bbEncode);
178         }
179         delete kvit;
180         
181         // Encode all the updates
182         kvit = keyValueUpdateSet->iterator();
183         while(kvit->hasNext()) {
184                 KeyValue *kv = kvit->next();
185                 kv->encode(bbEncode);
186         }
187         delete kvit;
188         
189         return bbEncode->array();
190 }
191