6c46fed4a036301c8fe2fe89d40b721dc2926d35
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "IoTString.h"
5
6 Commit::Commit() :
7         parts(new Vector<CommitPart *>()),
8         partCount(0),
9         missingParts(NULL),
10         fldisComplete(false),
11         hasLastPart(false),
12         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
13         isDead(false),
14         sequenceNumber(-1),
15         machineId(-1),
16         transactionSequenceNumber(-1),
17         liveKeys(new Hashset<IoTString *>) {
18 }
19
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21         parts(new Vector<CommitPart *>()),
22         partCount(0),
23         missingParts(NULL),
24         fldisComplete(true),
25         hasLastPart(false),
26         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
27         isDead(false),
28         sequenceNumber(_sequenceNumber),
29         machineId(_machineId),
30         transactionSequenceNumber(_transactionSequenceNumber),
31         liveKeys(new Hashset<IoTString *>) {
32 }
33
34 Commit::~Commit() {
35         delete parts;
36         delete keyValueUpdateSet;
37         delete liveKeys;
38         if (missingParts != NULL)
39                 delete missingParts;
40 }
41
42 void Commit::addPartDecode(CommitPart *newPart) {
43         if (isDead) {
44                 // If dead then just kill this part and move on
45                 newPart->setDead();
46                 return;
47         }
48
49         CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
50         if (previouslySeenPart == NULL)
51                 partCount++;
52
53         if (previouslySeenPart != NULL) {
54                 // Set dead the old one since the new one is a rescued version of this part
55                 previouslySeenPart->setDead();
56         } else if (newPart->isLastPart()) {
57                 missingParts = new Hashset<int32_t>();
58                 hasLastPart = true;
59
60                 for (int i = 0; i < newPart->getPartNumber(); i++) {
61                         if (parts->get(i) == NULL) {
62                                 missingParts->add(i);
63                         }
64                 }
65         }
66
67         if (!fldisComplete && hasLastPart) {
68
69                 // We have seen this part so remove it from the set of missing parts
70                 missingParts->remove(newPart->getPartNumber());
71
72                 // Check if all the parts have been seen
73                 if (missingParts->size() == 0) {
74
75                         // We have all the parts
76                         fldisComplete = true;
77
78                         // Decode all the parts and create the key value guard and update sets
79                         decodeCommitData();
80
81                         // Get the sequence number and arbitrator of this transaction
82                         sequenceNumber = parts->get(0)->getSequenceNumber();
83                         machineId = parts->get(0)->getMachineId();
84                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
85                 }
86         }
87 }
88
89 int64_t Commit::getSequenceNumber() {
90         return sequenceNumber;
91 }
92
93 int64_t Commit::getTransactionSequenceNumber() {
94         return transactionSequenceNumber;
95 }
96
97 Vector<CommitPart *> *Commit::getParts() {
98         return parts;
99 }
100
101 void Commit::addKV(KeyValue *kv) {
102         keyValueUpdateSet->add(kv);
103         liveKeys->add(kv->getKey());
104 }
105
106 void Commit::invalidateKey(IoTString *key) {
107         liveKeys->remove(key);
108
109         if (liveKeys->size() == 0) {
110                 setDead();
111         }
112 }
113
114 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
115         return keyValueUpdateSet;
116 }
117
118 int32_t Commit::getNumberOfParts() {
119         return partCount;
120 }
121
122 void Commit::setDead() {
123         if (!isDead) {
124                 isDead = true;
125                 // Make all the parts of this transaction dead
126                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
127                         CommitPart *part = parts->get(partNumber);
128                         if (parts != NULL)
129                                 part->setDead();
130                 }
131         }
132 }
133
134 CommitPart *Commit::getPart(int index) {
135         return parts->get(index);
136 }
137
138 void Commit::createCommitParts() {
139         parts->clear();
140         partCount = 0;
141         // Convert to chars
142         Array<char> *charData = convertDataToBytes();
143
144         int commitPartCount = 0;
145         int currentPosition = 0;
146         int remaining = charData->length();
147
148         while (remaining > 0) {
149                 bool isLastPart = false;
150                 // determine how much to copy
151                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
152                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
153                         copySize = remaining;
154                         isLastPart = true;// last bit of data so last part
155                 }
156
157                 // Copy to a smaller version
158                 Array<char> *partData = new Array<char>(copySize);
159                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
160
161                 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
162                 parts->setExpand(part->getPartNumber(), part);
163
164                 // Update position, count and remaining
165                 currentPosition += copySize;
166                 commitPartCount++;
167                 remaining -= copySize;
168         }
169 }
170
171 void Commit::decodeCommitData() {
172         // Calculate the size of the data section
173         int dataSize = 0;
174         for (uint i = 0; i < parts->size(); i++) {
175                 CommitPart *tp = parts->get(i);
176                 if (tp != NULL)
177                         dataSize += tp->getDataSize();
178         }
179
180         Array<char> *combinedData = new Array<char>(dataSize);
181         int currentPosition = 0;
182
183         // Stitch all the data sections together
184         for (uint i = 0; i < parts->size(); i++) {
185                 CommitPart *tp = parts->get(i);
186                 if (tp != NULL) {
187                         System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
188                         currentPosition += tp->getDataSize();
189                 }
190         }
191
192         // Decoder Object
193         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
194
195         // Decode how many key value pairs need to be decoded
196         int numberOfKVUpdates = bbDecode->getInt();
197
198         // Decode all the updates key values
199         for (int i = 0; i < numberOfKVUpdates; i++) {
200                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
201                 keyValueUpdateSet->add(kv);
202                 liveKeys->add(kv->getKey());
203         }
204 }
205
206 Array<char> *Commit::convertDataToBytes() {
207         // Calculate the size of the data
208         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
209         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
210         while (kvit->hasNext()) {
211                 KeyValue *kv = kvit->next();
212                 sizeOfData += kv->getSize();
213         }
214         delete kvit;
215
216         // Data handlers and storage
217         Array<char> *dataArray = new Array<char>(sizeOfData);
218         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
219
220         // Encode the size of the updates and guard sets
221         bbEncode->putInt(keyValueUpdateSet->size());
222
223         // Encode all the updates
224         kvit = keyValueUpdateSet->iterator();
225         while (kvit->hasNext()) {
226                 KeyValue *kv = kvit->next();
227                 kv->encode(bbEncode);
228         }
229         delete kvit;
230
231         return bbEncode->array();
232 }
233
234 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0> *newKVs) {
235         keyValueUpdateSet->clear();
236         keyValueUpdateSet->addAll(newKVs);
237         liveKeys->clear();
238         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = newKVs->iterator();
239         while (kvit->hasNext()) {
240                 liveKeys->add(kvit->next()->getKey());
241         }
242         delete kvit;
243 }
244
245 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
246         if (older == NULL) {
247                 return newer;
248         } else if (newer == NULL) {
249                 return older;
250         }
251         Hashset<KeyValue *, uintptr_t, 0> *kvSet = new Hashset<KeyValue *, uintptr_t, 0>();
252         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
253         while (kvit->hasNext()) {
254                 KeyValue *kv = kvit->next();
255                 kvSet->add(kv);
256         }
257         delete kvit;
258         kvit = newer->getKeyValueUpdateSet()->iterator();
259         while (kvit->hasNext()) {
260                 KeyValue *kv = kvit->next();
261                 kvSet->add(kv);
262         }
263         delete kvit;
264
265         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
266         if (transactionSequenceNumber == -1) {
267                 transactionSequenceNumber = older->getTransactionSequenceNumber();
268         }
269
270         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
271         newCommit->setKVsMap(kvSet);
272
273         delete kvSet;
274         return newCommit;
275 }