edits
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "KeyValue.h"
5
6 Commit::Commit() :
7         parts(new Hashtable<int32_t, CommitPart *>()),
8         missingParts(NULL),
9         fldisComplete(false),
10         hasLastPart(false),
11         keyValueUpdateSet(new Hashset<KeyValue *>()),
12         isDead(false),
13         sequenceNumber(-1),
14         machineId(-1),
15         transactionSequenceNumber(-1),
16         liveKeys(new Hashset<IoTString *>) {
17 }
18
19
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21         parts(new Hashtable<int32_t, CommitPart *>()),
22         missingParts(NULL),
23         fldisComplete(true),
24         hasLastPart(false),
25         keyValueUpdateSet(new Hashset<KeyValue *>()),
26         isDead(false),
27         sequenceNumber(_sequenceNumber),
28         machineId(_machineId),
29         transactionSequenceNumber(_transactionSequenceNumber),
30         liveKeys(new Hashset<IoTString *>) {
31 }
32
33 void Commit::addPartDecode(CommitPart *newPart) {
34         if (isDead) {
35                 // If dead then just kill this part and move on
36                 newPart->setDead();
37                 return;
38         }
39
40         CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
41
42         if (previoslySeenPart != NULL) {
43                 // Set dead the old one since the new one is a rescued version of this part
44                 previoslySeenPart->setDead();
45         } else if (newPart->isLastPart()) {
46                 missingParts = new Hashset<int32_t>();
47                 hasLastPart = true;
48
49                 for (int i = 0; i < newPart->getPartNumber(); i++) {
50                         if (parts->get(i) == NULL) {
51                                 missingParts->add(i);
52                         }
53                 }
54         }
55
56         if (!fldisComplete && hasLastPart) {
57
58                 // We have seen this part so remove it from the set of missing parts
59                 missingParts->remove(newPart->getPartNumber());
60
61                 // Check if all the parts have been seen
62                 if (missingParts->size() == 0) {
63
64                         // We have all the parts
65                         fldisComplete = true;
66
67                         // Decode all the parts and create the key value guard and update sets
68                         decodeCommitData();
69
70                         // Get the sequence number and arbitrator of this transaction
71                         sequenceNumber = parts->get(0)->getSequenceNumber();
72                         machineId = parts->get(0)->getMachineId();
73                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
74                 }
75         }
76 }
77
78 int64_t Commit::getSequenceNumber() {
79         return sequenceNumber;
80 }
81
82 int64_t Commit::getTransactionSequenceNumber() {
83         return transactionSequenceNumber;
84 }
85
86 Hashtable<int32_t, CommitPart *> *Commit::getParts() {
87         return parts;
88 }
89
90 void Commit::addKV(KeyValue *kv) {
91         keyValueUpdateSet->add(kv);
92         liveKeys->add(kv->getKey());
93 }
94
95 void Commit::invalidateKey(IoTString *key) {
96         liveKeys->remove(key);
97
98         if (liveKeys->size() == 0) {
99                 setDead();
100         }
101 }
102
103 Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
104         return keyValueUpdateSet;
105 }
106
107 int32_t Commit::getNumberOfParts() {
108         return parts->size();
109 }
110
111 void Commit::setDead() {
112         if (isDead) {
113                 // Already dead
114                 return;
115         }
116
117         // Set dead
118         isDead = true;
119
120         // Make all the parts of this transaction dead
121         for (int32_t partNumber : parts->keySet()) {
122                 CommitPart *part = parts->get(partNumber);
123                 part->setDead();
124         }
125 }
126
127 CommitPart *Commit::getPart(int index) {
128         return parts->get(index);
129 }
130
131 void Commit::createCommitParts() {
132         parts->clear();
133
134         // Convert to chars
135         Array<char> *charData = convertDataToBytes();
136
137
138         int commitPartCount = 0;
139         int currentPosition = 0;
140         int remaining = charData->length();
141
142         while (remaining > 0) {
143
144                 bool isLastPart = false;
145                 // determine how much to copy
146                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
147                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
148                         copySize = remaining;
149                         isLastPart = true;// last bit of data so last part
150                 }
151
152                 // Copy to a smaller version
153                 Array<char> *partData = new Array<char>(copySize);
154                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
155
156                 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
157                 parts->put(part->getPartNumber(), part);
158
159                 // Update position, count and remaining
160                 currentPosition += copySize;
161                 commitPartCount++;
162                 remaining -= copySize;
163         }
164 }
165
166 void Commit::decodeCommitData() {
167
168         // Calculate the size of the data section
169         int dataSize = 0;
170         for (int i = 0; i < parts->keySet()->size(); i++) {
171                 CommitPart *tp = parts->get(i);
172                 dataSize += tp->getDataSize();
173         }
174
175         Array<char> *combinedData = new Array<char>(dataSize);
176         int currentPosition = 0;
177
178         // Stitch all the data sections together
179         for (int i = 0; i < parts->keySet()->size(); i++) {
180                 CommitPart *tp = parts->get(i);
181                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
182                 currentPosition += tp->getDataSize();
183         }
184
185         // Decoder Object
186         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
187
188         // Decode how many key value pairs need to be decoded
189         int numberOfKVUpdates = bbDecode->getInt();
190
191         // Decode all the updates key values
192         for (int i = 0; i < numberOfKVUpdates; i++) {
193                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
194                 keyValueUpdateSet->add(kv);
195                 liveKeys->add(kv->getKey());
196         }
197 }
198
199 Array<char> *convertDataToBytes() {
200
201         // Calculate the size of the data
202         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
203         for (KeyValue *kv : keyValueUpdateSet) {
204                 sizeOfData += kv->getSize();
205         }
206
207         // Data handlers and storage
208         Array<char> *dataArray = new Array<char>(sizeOfData);
209         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
210
211         // Encode the size of the updates and guard sets
212         bbEncode->putInt(keyValueUpdateSet->size());
213
214         // Encode all the updates
215         for (KeyValue *kv : keyValueUpdateSet) {
216                 kv->encode(bbEncode);
217         }
218
219         return bbEncode->array();
220 }
221
222 void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
223         keyValueUpdateSet->clear();
224         liveKeys->clear();
225
226         keyValueUpdateSet->addAll(newKVs->values());
227         liveKeys->addAll(newKVs->keySet());
228 }
229
230 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
231
232         if (older == NULL) {
233                 return newer;
234         } else if (newer == NULL) {
235                 return older;
236         }
237
238         Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
239         for (KeyValue *kv : older->getKeyValueUpdateSet()) {
240                 kvSet->put(kv->getKey(), kv);
241         }
242
243         for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
244                 kvSet->put(kv->getKey(), kv);
245         }
246
247         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
248
249         if (transactionSequenceNumber == -1) {
250                 transactionSequenceNumber = older->getTransactionSequenceNumber();
251         }
252
253         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
254
255         newCommit->setKVsMap(kvSet);
256
257         return newCommit;
258 }