e01a778c419643cfa93495c87cd042694b941891
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "IoTString.h"
5
6 Commit::Commit() :
7         parts(new Vector<CommitPart *>()),
8         partCount(0),
9         missingParts(NULL),
10         fldisComplete(false),
11         hasLastPart(false),
12         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
13         isDead(false),
14         sequenceNumber(-1),
15         machineId(-1),
16         transactionSequenceNumber(-1),
17         liveKeys(new Hashset<IoTString *>()) {
18 }
19
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21         parts(new Vector<CommitPart *>()),
22         partCount(0),
23         missingParts(NULL),
24         fldisComplete(true),
25         hasLastPart(false),
26         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
27         isDead(false),
28         sequenceNumber(_sequenceNumber),
29         machineId(_machineId),
30         transactionSequenceNumber(_transactionSequenceNumber),
31         liveKeys(new Hashset<IoTString *>()) {
32 }
33
34 Commit::~Commit() {
35         delete parts;
36         {
37                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
38                 while(keyit->hasNext()) {
39                         delete keyit->next();
40                 }
41                 delete keyit;
42                 delete keyValueUpdateSet;
43         }
44         delete liveKeys;
45         if (missingParts != NULL)
46                 delete missingParts;
47 }
48
49 void Commit::addPartDecode(CommitPart *newPart) {
50         if (isDead) {
51                 // If dead then just kill this part and move on
52                 newPart->setDead();
53                 return;
54         }
55
56         CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
57         if (previouslySeenPart == NULL)
58                 partCount++;
59
60         if (previouslySeenPart != NULL) {
61                 // Set dead the old one since the new one is a rescued version of this part
62                 previouslySeenPart->setDead();
63         } else if (newPart->isLastPart()) {
64                 missingParts = new Hashset<int32_t>();
65                 hasLastPart = true;
66
67                 for (int i = 0; i < newPart->getPartNumber(); i++) {
68                         if (parts->get(i) == NULL) {
69                                 missingParts->add(i);
70                         }
71                 }
72         }
73
74         if (!fldisComplete && hasLastPart) {
75
76                 // We have seen this part so remove it from the set of missing parts
77                 missingParts->remove(newPart->getPartNumber());
78
79                 // Check if all the parts have been seen
80                 if (missingParts->size() == 0) {
81
82                         // We have all the parts
83                         fldisComplete = true;
84
85                         // Decode all the parts and create the key value guard and update sets
86                         decodeCommitData();
87
88                         // Get the sequence number and arbitrator of this transaction
89                         sequenceNumber = parts->get(0)->getSequenceNumber();
90                         machineId = parts->get(0)->getMachineId();
91                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
92                 }
93         }
94 }
95
96 int64_t Commit::getSequenceNumber() {
97         return sequenceNumber;
98 }
99
100 int64_t Commit::getTransactionSequenceNumber() {
101         return transactionSequenceNumber;
102 }
103
104 Vector<CommitPart *> *Commit::getParts() {
105         return parts;
106 }
107
108 void Commit::addKV(KeyValue *kv) {
109         KeyValue * kvcopy = kv->getCopy();
110         keyValueUpdateSet->add(kvcopy);
111         liveKeys->add(kvcopy->getKey());
112 }
113
114 void Commit::invalidateKey(IoTString *key) {
115         liveKeys->remove(key);
116
117         if (liveKeys->size() == 0) {
118                 setDead();
119         }
120 }
121
122 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
123         return keyValueUpdateSet;
124 }
125
126 int32_t Commit::getNumberOfParts() {
127         return partCount;
128 }
129
130 void Commit::setDead() {
131         if (!isDead) {
132                 isDead = true;
133                 // Make all the parts of this transaction dead
134                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
135                         CommitPart *part = parts->get(partNumber);
136                         if (parts != NULL)
137                                 part->setDead();
138                 }
139         }
140 }
141
142 CommitPart *Commit::getPart(int index) {
143         return parts->get(index);
144 }
145
146 void Commit::createCommitParts() {
147         parts->clear();
148         partCount = 0;
149         // Convert to chars
150         Array<char> *charData = convertDataToBytes();
151
152         int commitPartCount = 0;
153         int currentPosition = 0;
154         int remaining = charData->length();
155
156         while (remaining > 0) {
157                 bool isLastPart = false;
158                 // determine how much to copy
159                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
160                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
161                         copySize = remaining;
162                         isLastPart = true;// last bit of data so last part
163                 }
164
165                 // Copy to a smaller version
166                 Array<char> *partData = new Array<char>(copySize);
167                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
168
169                 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
170                 parts->setExpand(part->getPartNumber(), part);
171
172                 // Update position, count and remaining
173                 currentPosition += copySize;
174                 commitPartCount++;
175                 remaining -= copySize;
176         }
177         delete charData;
178 }
179
180 void Commit::decodeCommitData() {
181         // Calculate the size of the data section
182         int dataSize = 0;
183         for (uint i = 0; i < parts->size(); i++) {
184                 CommitPart *tp = parts->get(i);
185                 if (tp != NULL)
186                         dataSize += tp->getDataSize();
187         }
188
189         Array<char> *combinedData = new Array<char>(dataSize);
190         int currentPosition = 0;
191
192         // Stitch all the data sections together
193         for (uint i = 0; i < parts->size(); i++) {
194                 CommitPart *tp = parts->get(i);
195                 if (tp != NULL) {
196                         System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
197                         currentPosition += tp->getDataSize();
198                 }
199         }
200
201         // Decoder Object
202         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
203
204         // Decode how many key value pairs need to be decoded
205         int numberOfKVUpdates = bbDecode->getInt();
206
207         // Decode all the updates key values
208         for (int i = 0; i < numberOfKVUpdates; i++) {
209                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
210                 keyValueUpdateSet->add(kv);
211                 liveKeys->add(kv->getKey());
212         }
213         delete bbDecode;
214 }
215
216 Array<char> *Commit::convertDataToBytes() {
217         // Calculate the size of the data
218         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
219         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
220         while (kvit->hasNext()) {
221                 KeyValue *kv = kvit->next();
222                 sizeOfData += kv->getSize();
223         }
224         delete kvit;
225
226         // Data handlers and storage
227         Array<char> *dataArray = new Array<char>(sizeOfData);
228         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
229
230         // Encode the size of the updates and guard sets
231         bbEncode->putInt(keyValueUpdateSet->size());
232
233         // Encode all the updates
234         kvit = keyValueUpdateSet->iterator();
235         while (kvit->hasNext()) {
236                 KeyValue *kv = kvit->next();
237                 kv->encode(bbEncode);
238         }
239         delete kvit;
240         Array<char> * array = bbEncode->array();
241         bbEncode->releaseArray();
242         delete bbEncode;
243         return array;
244 }
245
246 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
247         keyValueUpdateSet->clear();
248         liveKeys->clear();
249         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
250         while (kvit->hasNext()) {
251                 KeyValue *kv = kvit->next();
252                 KeyValue *kvcopy = kv->getCopy();
253                 liveKeys->add(kvcopy->getKey());
254                 keyValueUpdateSet->add(kvcopy);
255         }
256         delete kvit;
257 }
258
259 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
260         if (older == NULL) {
261                 return newer;
262         } else if (newer == NULL) {
263                 return older;
264         }
265         Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
266         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
267         while (kvit->hasNext()) {
268                 KeyValue *kv = kvit->next();
269                 kvSet->add(kv);
270         }
271         delete kvit;
272         kvit = newer->getKeyValueUpdateSet()->iterator();
273         while (kvit->hasNext()) {
274                 KeyValue *kv = kvit->next();
275                 kvSet->add(kv);
276         }
277         delete kvit;
278
279         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
280         if (transactionSequenceNumber == -1) {
281                 transactionSequenceNumber = older->getTransactionSequenceNumber();
282         }
283
284         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
285         newCommit->setKVsMap(kvSet);
286
287         delete kvSet;
288         return newCommit;
289 }