edits
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "commit.h"
2
3 Commit::Commit() :
4         parts(new Hashtable<int32_t, CommitPart *>()),
5         missingParts(NULL),
6         fldisComplete(false),
7         hasLastPart(false),
8         keyValueUpdateSet(new HashSet<KeyValue *>()),
9         isDead(false),
10         sequenceNumber(-1),
11         machineId(-1),
12         transactionSequenceNumber(-1),
13         liveKeys(new Hashset<IoTString *>) {
14 }
15
16
17 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
18         parts(new Hashtable<int32_t, CommitPart *>()),
19         missingParts(NULL),
20         fldisComplete(true),
21         hasLastPart(false),
22         keyValueUpdateSet(new HashSet<KeyValue *>()),
23         isDead(false),
24         sequenceNumber(_sequenceNumber),
25         machineId(_machineId),
26         transactionSequenceNumber(_transactionSequenceNumber),
27         liveKeys(new Hashset<IoTString *>) {
28 }
29
30 void Commit::addPartDecode(CommitPart newPart) {
31
32         if (isDead) {
33                 // If dead then just kill this part and move on
34                 newPart->setDead();
35                 return;
36         }
37
38         CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
39
40         if (previoslySeenPart != NULL) {
41                 // Set dead the old one since the new one is a rescued version of this part
42                 previoslySeenPart->setDead();
43         } else if (newPart->isLastPart()) {
44                 missingParts = new HashSet<int32_t>();
45                 hasLastPart = true;
46
47                 for (int i = 0; i < newPart->getPartNumber(); i++) {
48                         if (parts->get(i) == NULL) {
49                                 missingParts->add(i);
50                         }
51                 }
52         }
53
54         if (!fldisComplete && hasLastPart) {
55
56                 // We have seen this part so remove it from the set of missing parts
57                 missingParts->remove(newPart->getPartNumber());
58
59                 // Check if all the parts have been seen
60                 if (missingParts->size() == 0) {
61
62                         // We have all the parts
63                         fldisComplete = true;
64
65                         // Decode all the parts and create the key value guard and update sets
66                         decodeCommitData();
67
68                         // Get the sequence number and arbitrator of this transaction
69                         sequenceNumber = parts->get(0)->getSequenceNumber();
70                         machineId = parts->get(0)->getMachineId();
71                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
72                 }
73         }
74 }
75
76 int64_t Commit::getSequenceNumber() {
77         return sequenceNumber;
78 }
79
80 int64_t Commit::getTransactionSequenceNumber() {
81         return transactionSequenceNumber;
82 }
83
84 Hashtable<int32_t, CommitPart *> *Commit::getParts() {
85         return parts;
86 }
87
88 void Commit::addKV(KeyValue *kv) {
89         keyValueUpdateSet->add(kv);
90         liveKeys->add(kv->getKey());
91 }
92
93 void Commit::invalidateKey(IoTString *key) {
94         liveKeys->remove(key);
95
96         if (liveKeys->size() == 0) {
97                 setDead();
98         }
99 }
100
101 Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
102         return keyValueUpdateSet;
103 }
104
105 int32_t Commit::getNumberOfParts() {
106         return parts->size();
107 }
108
109 void Commit::setDead() {
110         if (isDead) {
111                 // Already dead
112                 return;
113         }
114
115         // Set dead
116         isDead = true;
117
118         // Make all the parts of this transaction dead
119         for (int32_t partNumber : parts->keySet()) {
120                 CommitPart part = parts->get(partNumber);
121                 part->setDead();
122         }
123 }
124
125 CommitPart *Commit::getPart(int index) {
126         return parts->get(index);
127 }
128
129 void Commit::createCommitParts() {
130         parts->clear();
131
132         // Convert to chars
133         Array<char> *charData = convertDataToBytes();
134
135
136         int commitPartCount = 0;
137         int currentPosition = 0;
138         int remaining = charData->length();
139
140         while (remaining > 0) {
141
142                 bool isLastPart = false;
143                 // determine how much to copy
144                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
145                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
146                         copySize = remaining;
147                         isLastPart = true;// last bit of data so last part
148                 }
149
150                 // Copy to a smaller version
151                 Array<char> *partData = new Array<char>(copySize);
152                 System->arraycopy(charData, currentPosition, partData, 0, copySize);
153
154                 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
155                 parts->put(part->getPartNumber(), part);
156
157                 // Update position, count and remaining
158                 currentPosition += copySize;
159                 commitPartCount++;
160                 remaining -= copySize;
161         }
162 }
163
164 void Commit::decodeCommitData() {
165
166         // Calculate the size of the data section
167         int dataSize = 0;
168         for (int i = 0; i < parts->keySet()->size(); i++) {
169                 CommitPart *tp = parts->get(i);
170                 dataSize += tp->getDataSize();
171         }
172
173         Array<char> *combinedData = new Array<char>(dataSize);
174         int currentPosition = 0;
175
176         // Stitch all the data sections together
177         for (int i = 0; i < parts->keySet()->size(); i++) {
178                 CommitPart *tp = parts->get(i);
179                 System->arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
180                 currentPosition += tp->getDataSize();
181         }
182
183         // Decoder Object
184         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
185
186         // Decode how many key value pairs need to be decoded
187         int numberOfKVUpdates = bbDecode->getInt();
188
189         // Decode all the updates key values
190         for (int i = 0; i < numberOfKVUpdates; i++) {
191                 KeyValue *kv = (KeyValue *)KeyValue->decode(bbDecode);
192                 keyValueUpdateSet->add(kv);
193                 liveKeys->add(kv->getKey());
194         }
195 }
196
197 Array<char> *convertDataToBytes() {
198
199         // Calculate the size of the data
200         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
201         for (KeyValue *kv : keyValueUpdateSet) {
202                 sizeOfData += kv->getSize();
203         }
204
205         // Data handlers and storage
206         Array<char> *dataArray = new Array<char>(sizeOfData);
207         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
208
209         // Encode the size of the updates and guard sets
210         bbEncode->putInt(keyValueUpdateSet->size());
211
212         // Encode all the updates
213         for (KeyValue *kv : keyValueUpdateSet) {
214                 kv->encode(bbEncode);
215         }
216
217         return bbEncode->array();
218 }
219
220 void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
221         keyValueUpdateSet->clear();
222         liveKeys->clear();
223
224         keyValueUpdateSet->addAll(newKVs->values());
225         liveKeys->addAll(newKVs->keySet());
226
227 }
228
229
230 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
231
232         if (older == NULL) {
233                 return newer;
234         } else if (newer == NULL) {
235                 return older;
236         }
237
238         Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
239         for (KeyValue *kv : older->getKeyValueUpdateSet()) {
240                 kvSet->put(kv->getKey(), kv);
241         }
242
243         for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
244                 kvSet->put(kv->getKey(), kv);
245         }
246
247         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
248
249         if (transactionSequenceNumber == -1) {
250                 transactionSequenceNumber = older->getTransactionSequenceNumber();
251         }
252
253         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
254
255         newCommit->setKVsMap(kvSet);
256
257         return newCommit;
258 }