Fixing initialization to just create a table on the cloud side.
[iotcloud.git] / version2 / src / C / Commit.cpp
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "IoTString.h"
5
6 Commit::Commit() :
7         parts(new Vector<CommitPart *>()),
8         partCount(0),
9         fldisComplete(false),
10         hasLastPart(false),
11         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
12         isDead(false),
13         sequenceNumber(-1),
14         machineId(-1),
15         transactionSequenceNumber(-1),
16         dataBytes(NULL),
17         liveKeys(new Hashset<IoTString *>()) {
18 }
19
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21         parts(new Vector<CommitPart *>()),
22         partCount(0),
23         fldisComplete(true),
24         hasLastPart(false),
25         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
26         isDead(false),
27         sequenceNumber(_sequenceNumber),
28         machineId(_machineId),
29         transactionSequenceNumber(_transactionSequenceNumber),
30         dataBytes(NULL),
31         liveKeys(new Hashset<IoTString *>()) {
32 }
33
34 Commit::~Commit() {
35         {
36                 uint Size = parts->size();
37                 for(uint i=0;i<Size; i++)
38                         parts->get(i)->releaseRef();
39                 delete parts;
40         }
41         {
42                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
43                 while(keyit->hasNext()) {
44                         delete keyit->next();
45                 }
46                 delete keyit;
47                 delete keyValueUpdateSet;
48         }
49         delete liveKeys;
50         if (dataBytes != NULL)
51                 delete dataBytes;
52 }
53
54 void Commit::addPartDecode(CommitPart *newPart) {
55         if (isDead) {
56                 // If dead then just kill this part and move on
57                 newPart->setDead();
58                 return;
59         }
60
61         newPart->acquireRef();
62         CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
63         if (previouslySeenPart == NULL)
64                 partCount++;
65
66         if (previouslySeenPart != NULL) {
67                 // Set dead the old one since the new one is a rescued version of this part
68                 previouslySeenPart->setDead();
69                 previouslySeenPart->releaseRef();
70         } else if (newPart->isLastPart()) {
71                 hasLastPart = true;
72         }
73
74         if (!fldisComplete && hasLastPart) {
75                 // We have seen this part so remove it from the set of missing parts
76                 uint size = parts->size();
77                 fldisComplete = true;
78                 for(uint i=0; i < size; i++) {
79                         if (parts->get(i) == NULL) {
80                                 fldisComplete = false;
81                                 break;
82                         }
83                 }
84
85                 if (fldisComplete) {
86                         // Decode all the parts and create the key value guard and update sets
87                         decodeCommitData();
88
89                         // Get the sequence number and arbitrator of this transaction
90                         sequenceNumber = parts->get(0)->getSequenceNumber();
91                         machineId = parts->get(0)->getMachineId();
92                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
93                 }
94         }
95 }
96
97 int64_t Commit::getSequenceNumber() {
98         return sequenceNumber;
99 }
100
101 int64_t Commit::getTransactionSequenceNumber() {
102         return transactionSequenceNumber;
103 }
104
105 Vector<CommitPart *> *Commit::getParts() {
106         return parts;
107 }
108
109 void Commit::addKV(KeyValue *kv) {
110         KeyValue * kvcopy = kv->getCopy();
111         keyValueUpdateSet->add(kvcopy);
112         liveKeys->add(kvcopy->getKey());
113 }
114
115 void Commit::invalidateKey(IoTString *key) {
116         liveKeys->remove(key);
117
118         if (liveKeys->size() == 0) {
119                 setDead();
120         }
121 }
122
123 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
124         return keyValueUpdateSet;
125 }
126
127 int32_t Commit::getNumberOfParts() {
128         return partCount;
129 }
130
131 void Commit::setDead() {
132         if (!isDead) {
133                 isDead = true;
134                 // Make all the parts of this transaction dead
135                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
136                         CommitPart *part = parts->get(partNumber);
137                         part->setDead();
138                 }
139         }
140 }
141
142 void Commit::createCommitParts() {
143         uint Size = parts->size();
144         for(uint i=0;i < Size; i++) {
145                 Entry * e=parts->get(i);
146                 e->releaseRef();
147         }
148         parts->clear();
149         partCount = 0;
150         // Convert to chars
151         Array<char> *charData = convertDataToBytes();
152
153         int commitPartCount = 0;
154         int currentPosition = 0;
155         int remaining = charData->length();
156
157         while (remaining > 0) {
158                 bool isLastPart = false;
159                 // determine how much to copy
160                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
161                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
162                         copySize = remaining;
163                         isLastPart = true;// last bit of data so last part
164                 }
165
166                 // Copy to a smaller version
167                 Array<char> *partData = new Array<char>(copySize);
168                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
169
170                 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
171                 parts->setExpand(part->getPartNumber(), part);
172
173                 // Update position, count and remaining
174                 currentPosition += copySize;
175                 commitPartCount++;
176                 remaining -= copySize;
177         }
178         delete charData;
179 }
180
181 void Commit::decodeCommitData() {
182         // Calculate the size of the data section
183         int dataSize = 0;
184         for (uint i = 0; i < parts->size(); i++) {
185                 CommitPart *tp = parts->get(i);
186                 if (tp != NULL)
187                         dataSize += tp->getDataSize();
188         }
189
190         Array<char> *combinedData = new Array<char>(dataSize);
191         int currentPosition = 0;
192
193         // Stitch all the data sections together
194         for (uint i = 0; i < parts->size(); i++) {
195                 CommitPart *tp = parts->get(i);
196                 if (tp != NULL) {
197                         System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
198                         currentPosition += tp->getDataSize();
199                 }
200         }
201
202         // Decoder Object
203         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
204
205         // Decode how many key value pairs need to be decoded
206         int numberOfKVUpdates = bbDecode->getInt();
207
208         // Decode all the updates key values
209         for (int i = 0; i < numberOfKVUpdates; i++) {
210                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
211                 keyValueUpdateSet->add(kv);
212                 liveKeys->add(kv->getKey());
213         }
214         delete bbDecode;
215 }
216
217 Array<char> *Commit::convertDataToBytes() {
218         // Calculate the size of the data
219         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
220         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
221         while (kvit->hasNext()) {
222                 KeyValue *kv = kvit->next();
223                 sizeOfData += kv->getSize();
224         }
225         delete kvit;
226
227         // Data handlers and storage
228         Array<char> *dataArray = new Array<char>(sizeOfData);
229         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
230
231         // Encode the size of the updates and guard sets
232         bbEncode->putInt(keyValueUpdateSet->size());
233
234         // Encode all the updates
235         kvit = keyValueUpdateSet->iterator();
236         while (kvit->hasNext()) {
237                 KeyValue *kv = kvit->next();
238                 kv->encode(bbEncode);
239         }
240         delete kvit;
241         Array<char> * array = bbEncode->array();
242         bbEncode->releaseArray();
243         delete bbEncode;
244         return array;
245 }
246
247 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
248         keyValueUpdateSet->clear();
249         liveKeys->clear();
250         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
251         while (kvit->hasNext()) {
252                 KeyValue *kv = kvit->next();
253                 KeyValue *kvcopy = kv->getCopy();
254                 liveKeys->add(kvcopy->getKey());
255                 keyValueUpdateSet->add(kvcopy);
256         }
257         delete kvit;
258 }
259
260 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
261         if (older == NULL) {
262                 return newer;
263         } else if (newer == NULL) {
264                 return older;
265         }
266         Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
267         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
268         while (kvit->hasNext()) {
269                 KeyValue *kv = kvit->next();
270                 kvSet->add(kv);
271         }
272         delete kvit;
273         kvit = newer->getKeyValueUpdateSet()->iterator();
274         while (kvit->hasNext()) {
275                 KeyValue *kv = kvit->next();
276                 kvSet->add(kv);
277         }
278         delete kvit;
279
280         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
281         if (transactionSequenceNumber == -1) {
282                 transactionSequenceNumber = older->getTransactionSequenceNumber();
283         }
284
285         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
286         newCommit->setKVsMap(kvSet);
287
288         delete kvSet;
289         return newCommit;
290 }