bytebuffer
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "IoTString.h"
5
6 Commit::Commit() :
7         parts(new Vector<CommitPart *>()),
8         partCount(0),
9         missingParts(NULL),
10         fldisComplete(false),
11         hasLastPart(false),
12         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
13         isDead(false),
14         sequenceNumber(-1),
15         machineId(-1),
16         transactionSequenceNumber(-1),
17         liveKeys(new Hashset<IoTString *>) {
18 }
19
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21         parts(new Vector<CommitPart *>()),
22         partCount(0),
23         missingParts(NULL),
24         fldisComplete(true),
25         hasLastPart(false),
26         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
27         isDead(false),
28         sequenceNumber(_sequenceNumber),
29         machineId(_machineId),
30         transactionSequenceNumber(_transactionSequenceNumber),
31         liveKeys(new Hashset<IoTString *>) {
32 }
33
34 void Commit::addPartDecode(CommitPart *newPart) {
35         if (isDead) {
36                 // If dead then just kill this part and move on
37                 newPart->setDead();
38                 return;
39         }
40
41         CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
42         if (previouslySeenPart == NULL)
43                 partCount++;
44
45         if (previouslySeenPart != NULL) {
46                 // Set dead the old one since the new one is a rescued version of this part
47                 previouslySeenPart->setDead();
48         } else if (newPart->isLastPart()) {
49                 missingParts = new Hashset<int32_t>();
50                 hasLastPart = true;
51
52                 for (int i = 0; i < newPart->getPartNumber(); i++) {
53                         if (parts->get(i) == NULL) {
54                                 missingParts->add(i);
55                         }
56                 }
57         }
58
59         if (!fldisComplete && hasLastPart) {
60
61                 // We have seen this part so remove it from the set of missing parts
62                 missingParts->remove(newPart->getPartNumber());
63
64                 // Check if all the parts have been seen
65                 if (missingParts->size() == 0) {
66
67                         // We have all the parts
68                         fldisComplete = true;
69
70                         // Decode all the parts and create the key value guard and update sets
71                         decodeCommitData();
72
73                         // Get the sequence number and arbitrator of this transaction
74                         sequenceNumber = parts->get(0)->getSequenceNumber();
75                         machineId = parts->get(0)->getMachineId();
76                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
77                 }
78         }
79 }
80
81 int64_t Commit::getSequenceNumber() {
82         return sequenceNumber;
83 }
84
85 int64_t Commit::getTransactionSequenceNumber() {
86         return transactionSequenceNumber;
87 }
88
89 Vector<CommitPart *> *Commit::getParts() {
90         return parts;
91 }
92
93 void Commit::addKV(KeyValue *kv) {
94         keyValueUpdateSet->add(kv);
95         liveKeys->add(kv->getKey());
96 }
97
98 void Commit::invalidateKey(IoTString *key) {
99         liveKeys->remove(key);
100
101         if (liveKeys->size() == 0) {
102                 setDead();
103         }
104 }
105
106 Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
107         return keyValueUpdateSet;
108 }
109
110 int32_t Commit::getNumberOfParts() {
111         return partCount;
112 }
113
114 void Commit::setDead() {
115         if (!isDead) {
116                 isDead = true;
117                 // Make all the parts of this transaction dead
118                 for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
119                         CommitPart *part = parts->get(partNumber);
120                         if (parts != NULL)
121                                 part->setDead();
122                 }
123         }
124 }
125
126 CommitPart *Commit::getPart(int index) {
127         return parts->get(index);
128 }
129
130 void Commit::createCommitParts() {
131         parts->clear();
132         partCount = 0;
133         // Convert to chars
134         Array<char> *charData = convertDataToBytes();
135
136         int commitPartCount = 0;
137         int currentPosition = 0;
138         int remaining = charData->length();
139
140         while (remaining > 0) {
141                 bool isLastPart = false;
142                 // determine how much to copy
143                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
144                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
145                         copySize = remaining;
146                         isLastPart = true;// last bit of data so last part
147                 }
148
149                 // Copy to a smaller version
150                 Array<char> *partData = new Array<char>(copySize);
151                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
152
153                 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
154                 parts->setExpand(part->getPartNumber(), part);
155
156                 // Update position, count and remaining
157                 currentPosition += copySize;
158                 commitPartCount++;
159                 remaining -= copySize;
160         }
161 }
162
163 void Commit::decodeCommitData() {
164         // Calculate the size of the data section
165         int dataSize = 0;
166         for (int i = 0; i < parts->size(); i++) {
167                 CommitPart *tp = parts->get(i);
168                 if (tp != NULL)
169                         dataSize += tp->getDataSize();
170         }
171
172         Array<char> *combinedData = new Array<char>(dataSize);
173         int currentPosition = 0;
174
175         // Stitch all the data sections together
176         for (int i = 0; i < parts->size(); i++) {
177                 CommitPart *tp = parts->get(i);
178                 if (tp != NULL) {
179                         System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
180                         currentPosition += tp->getDataSize();
181                 }
182         }
183
184         // Decoder Object
185         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
186
187         // Decode how many key value pairs need to be decoded
188         int numberOfKVUpdates = bbDecode->getInt();
189
190         // Decode all the updates key values
191         for (int i = 0; i < numberOfKVUpdates; i++) {
192                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
193                 keyValueUpdateSet->add(kv);
194                 liveKeys->add(kv->getKey());
195         }
196 }
197
198 Array<char> *Commit::convertDataToBytes() {
199         // Calculate the size of the data
200         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
201         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = keyValueUpdateSet->iterator();
202         while (kvit->hasNext()) {
203                 KeyValue *kv = kvit->next();
204                 sizeOfData += kv->getSize();
205         }
206         delete kvit;
207
208         // Data handlers and storage
209         Array<char> *dataArray = new Array<char>(sizeOfData);
210         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
211
212         // Encode the size of the updates and guard sets
213         bbEncode->putInt(keyValueUpdateSet->size());
214
215         // Encode all the updates
216         kvit = keyValueUpdateSet->iterator();
217         while (kvit->hasNext()) {
218                 KeyValue *kv = kvit->next();
219                 kv->encode(bbEncode);
220         }
221         delete kvit;
222
223         return bbEncode->array();
224 }
225
226 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
227         keyValueUpdateSet->clear();
228         keyValueUpdateSet->addAll(newKVs);
229         liveKeys->clear();
230         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
231         while (kvit->hasNext()) {
232                 liveKeys->add(kvit->next()->getKey());
233         }
234         delete kvit;
235 }
236
237 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
238         if (older == NULL) {
239                 return newer;
240         } else if (newer == NULL) {
241                 return older;
242         }
243         Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
244         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
245         while (kvit->hasNext()) {
246                 KeyValue *kv = kvit->next();
247                 kvSet->add(kv);
248         }
249         delete kvit;
250         kvit = newer->getKeyValueUpdateSet()->iterator();
251         while (kvit->hasNext()) {
252                 KeyValue *kv = kvit->next();
253                 kvSet->add(kv);
254         }
255         delete kvit;
256
257         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
258         if (transactionSequenceNumber == -1) {
259                 transactionSequenceNumber = older->getTransactionSequenceNumber();
260         }
261
262         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
263         newCommit->setKVsMap(kvSet);
264
265         delete kvSet;
266         return newCommit;
267 }