edits
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "KeyValue.h"
5
6 Commit::Commit() :
7         parts(new Hashtable<int32_t, CommitPart *>()),
8         missingParts(NULL),
9         fldisComplete(false),
10         hasLastPart(false),
11         keyValueUpdateSet(new Hashset<KeyValue *>()),
12         isDead(false),
13         sequenceNumber(-1),
14         machineId(-1),
15         transactionSequenceNumber(-1),
16         liveKeys(new Hashset<IoTString *>) {
17 }
18
19 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
20         parts(new Hashtable<int32_t, CommitPart *>()),
21         missingParts(NULL),
22         fldisComplete(true),
23         hasLastPart(false),
24         keyValueUpdateSet(new Hashset<KeyValue *>()),
25         isDead(false),
26         sequenceNumber(_sequenceNumber),
27         machineId(_machineId),
28         transactionSequenceNumber(_transactionSequenceNumber),
29         liveKeys(new Hashset<IoTString *>) {
30 }
31
32 void Commit::addPartDecode(CommitPart *newPart) {
33         if (isDead) {
34                 // If dead then just kill this part and move on
35                 newPart->setDead();
36                 return;
37         }
38
39         CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
40
41         if (previoslySeenPart != NULL) {
42                 // Set dead the old one since the new one is a rescued version of this part
43                 previoslySeenPart->setDead();
44         } else if (newPart->isLastPart()) {
45                 missingParts = new Hashset<int32_t>();
46                 hasLastPart = true;
47
48                 for (int i = 0; i < newPart->getPartNumber(); i++) {
49                         if (parts->get(i) == NULL) {
50                                 missingParts->add(i);
51                         }
52                 }
53         }
54
55         if (!fldisComplete && hasLastPart) {
56
57                 // We have seen this part so remove it from the set of missing parts
58                 missingParts->remove(newPart->getPartNumber());
59
60                 // Check if all the parts have been seen
61                 if (missingParts->size() == 0) {
62
63                         // We have all the parts
64                         fldisComplete = true;
65
66                         // Decode all the parts and create the key value guard and update sets
67                         decodeCommitData();
68
69                         // Get the sequence number and arbitrator of this transaction
70                         sequenceNumber = parts->get(0)->getSequenceNumber();
71                         machineId = parts->get(0)->getMachineId();
72                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
73                 }
74         }
75 }
76
77 int64_t Commit::getSequenceNumber() {
78         return sequenceNumber;
79 }
80
81 int64_t Commit::getTransactionSequenceNumber() {
82         return transactionSequenceNumber;
83 }
84
85 Hashtable<int32_t, CommitPart *> *Commit::getParts() {
86         return parts;
87 }
88
89 void Commit::addKV(KeyValue *kv) {
90         keyValueUpdateSet->add(kv);
91         liveKeys->add(kv->getKey());
92 }
93
94 void Commit::invalidateKey(IoTString *key) {
95         liveKeys->remove(key);
96
97         if (liveKeys->size() == 0) {
98                 setDead();
99         }
100 }
101
102 Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
103         return keyValueUpdateSet;
104 }
105
106 int32_t Commit::getNumberOfParts() {
107         return parts->size();
108 }
109
110 void Commit::setDead() {
111         if (!isDead) {
112                 isDead = true;
113                 // Make all the parts of this transaction dead
114                 for (int32_t partNumber : parts->keySet()) {
115                         CommitPart *part = parts->get(partNumber);
116                         part->setDead();
117                 }
118         }
119 }
120
121 CommitPart *Commit::getPart(int index) {
122         return parts->get(index);
123 }
124
125 void Commit::createCommitParts() {
126         parts->clear();
127         // Convert to chars
128         Array<char> *charData = convertDataToBytes();
129
130         int commitPartCount = 0;
131         int currentPosition = 0;
132         int remaining = charData->length();
133
134         while (remaining > 0) {
135                 bool isLastPart = false;
136                 // determine how much to copy
137                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
138                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
139                         copySize = remaining;
140                         isLastPart = true;// last bit of data so last part
141                 }
142
143                 // Copy to a smaller version
144                 Array<char> *partData = new Array<char>(copySize);
145                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
146
147                 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
148                 parts->put(part->getPartNumber(), part);
149
150                 // Update position, count and remaining
151                 currentPosition += copySize;
152                 commitPartCount++;
153                 remaining -= copySize;
154         }
155 }
156
157 void Commit::decodeCommitData() {
158         // Calculate the size of the data section
159         int dataSize = 0;
160         for (int i = 0; i < parts->keySet()->size(); i++) {
161                 CommitPart *tp = parts->get(i);
162                 dataSize += tp->getDataSize();
163         }
164
165         Array<char> *combinedData = new Array<char>(dataSize);
166         int currentPosition = 0;
167
168         // Stitch all the data sections together
169         for (int i = 0; i < parts->keySet()->size(); i++) {
170                 CommitPart *tp = parts->get(i);
171                 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
172                 currentPosition += tp->getDataSize();
173         }
174
175         // Decoder Object
176         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
177
178         // Decode how many key value pairs need to be decoded
179         int numberOfKVUpdates = bbDecode->getInt();
180
181         // Decode all the updates key values
182         for (int i = 0; i < numberOfKVUpdates; i++) {
183                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
184                 keyValueUpdateSet->add(kv);
185                 liveKeys->add(kv->getKey());
186         }
187 }
188
189 Array<char> *convertDataToBytes() {
190
191         // Calculate the size of the data
192         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
193         for (KeyValue *kv : keyValueUpdateSet) {
194                 sizeOfData += kv->getSize();
195         }
196
197         // Data handlers and storage
198         Array<char> *dataArray = new Array<char>(sizeOfData);
199         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
200
201         // Encode the size of the updates and guard sets
202         bbEncode->putInt(keyValueUpdateSet->size());
203
204         // Encode all the updates
205         for (KeyValue *kv : keyValueUpdateSet) {
206                 kv->encode(bbEncode);
207         }
208
209         return bbEncode->array();
210 }
211
212 void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
213         keyValueUpdateSet->clear();
214         keyValueUpdateSet->addAll(newKVs->values());
215         liveKeys->clear();
216         liveKeys->addAll(newKVs->keySet());
217 }
218
219 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
220         if (older == NULL) {
221                 return newer;
222         } else if (newer == NULL) {
223                 return older;
224         }
225         Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
226         for (KeyValue *kv : older->getKeyValueUpdateSet()) {
227                 kvSet->put(kv->getKey(), kv);
228         }
229         for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
230                 kvSet->put(kv->getKey(), kv);
231         }
232
233         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
234         if (transactionSequenceNumber == -1) {
235                 transactionSequenceNumber = older->getTransactionSequenceNumber();
236         }
237
238         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
239         newCommit->setKVsMap(kvSet);
240
241         return newCommit;
242 }