2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Vector<CommitPart *>()),
12 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
16 transactionSequenceNumber(-1),
18 liveKeys(new Hashset<IoTString *>()) {
21 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
22 parts(new Vector<CommitPart *>()),
27 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
29 sequenceNumber(_sequenceNumber),
30 machineId(_machineId),
31 transactionSequenceNumber(_transactionSequenceNumber),
33 liveKeys(new Hashset<IoTString *>()) {
38 uint Size = parts->size();
39 for(uint i=0;i<Size; i++)
40 parts->get(i)->releaseRef();
44 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
45 while(keyit->hasNext()) {
49 delete keyValueUpdateSet;
52 if (missingParts != NULL)
54 if (dataBytes != NULL)
58 void Commit::addPartDecode(CommitPart *newPart) {
60 // If dead then just kill this part and move on
65 newPart->acquireRef();
66 CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
67 if (previouslySeenPart == NULL)
70 if (previouslySeenPart != NULL) {
71 // Set dead the old one since the new one is a rescued version of this part
72 previouslySeenPart->setDead();
73 previouslySeenPart->releaseRef();
74 } else if (newPart->isLastPart()) {
75 missingParts = new Hashset<int32_t>();
78 for (int i = 0; i < newPart->getPartNumber(); i++) {
79 if (parts->get(i) == NULL) {
85 if (!fldisComplete && hasLastPart) {
87 // We have seen this part so remove it from the set of missing parts
88 missingParts->remove(newPart->getPartNumber());
90 // Check if all the parts have been seen
91 if (missingParts->size() == 0) {
93 // We have all the parts
96 // Decode all the parts and create the key value guard and update sets
99 // Get the sequence number and arbitrator of this transaction
100 sequenceNumber = parts->get(0)->getSequenceNumber();
101 machineId = parts->get(0)->getMachineId();
102 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
107 int64_t Commit::getSequenceNumber() {
108 return sequenceNumber;
111 int64_t Commit::getTransactionSequenceNumber() {
112 return transactionSequenceNumber;
115 Vector<CommitPart *> *Commit::getParts() {
119 void Commit::addKV(KeyValue *kv) {
120 KeyValue * kvcopy = kv->getCopy();
121 keyValueUpdateSet->add(kvcopy);
122 liveKeys->add(kvcopy->getKey());
125 void Commit::invalidateKey(IoTString *key) {
126 liveKeys->remove(key);
128 if (liveKeys->size() == 0) {
133 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
134 return keyValueUpdateSet;
137 int32_t Commit::getNumberOfParts() {
141 void Commit::setDead() {
144 // Make all the parts of this transaction dead
145 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
146 CommitPart *part = parts->get(partNumber);
152 void Commit::createCommitParts() {
153 uint Size = parts->size();
154 for(uint i=0;i < Size; i++) {
155 Entry * e=parts->get(i);
161 Array<char> *charData = convertDataToBytes();
163 int commitPartCount = 0;
164 int currentPosition = 0;
165 int remaining = charData->length();
167 while (remaining > 0) {
168 bool isLastPart = false;
169 // determine how much to copy
170 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
171 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
172 copySize = remaining;
173 isLastPart = true;// last bit of data so last part
176 // Copy to a smaller version
177 Array<char> *partData = new Array<char>(copySize);
178 System_arraycopy(charData, currentPosition, partData, 0, copySize);
180 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
181 parts->setExpand(part->getPartNumber(), part);
183 // Update position, count and remaining
184 currentPosition += copySize;
186 remaining -= copySize;
191 void Commit::decodeCommitData() {
192 // Calculate the size of the data section
194 for (uint i = 0; i < parts->size(); i++) {
195 CommitPart *tp = parts->get(i);
197 dataSize += tp->getDataSize();
200 Array<char> *combinedData = new Array<char>(dataSize);
201 int currentPosition = 0;
203 // Stitch all the data sections together
204 for (uint i = 0; i < parts->size(); i++) {
205 CommitPart *tp = parts->get(i);
207 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
208 currentPosition += tp->getDataSize();
213 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
215 // Decode how many key value pairs need to be decoded
216 int numberOfKVUpdates = bbDecode->getInt();
218 // Decode all the updates key values
219 for (int i = 0; i < numberOfKVUpdates; i++) {
220 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
221 keyValueUpdateSet->add(kv);
222 liveKeys->add(kv->getKey());
227 Array<char> *Commit::convertDataToBytes() {
228 // Calculate the size of the data
229 int sizeOfData = sizeof(int32_t); // Number of Update KV's
230 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
231 while (kvit->hasNext()) {
232 KeyValue *kv = kvit->next();
233 sizeOfData += kv->getSize();
237 // Data handlers and storage
238 Array<char> *dataArray = new Array<char>(sizeOfData);
239 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
241 // Encode the size of the updates and guard sets
242 bbEncode->putInt(keyValueUpdateSet->size());
244 // Encode all the updates
245 kvit = keyValueUpdateSet->iterator();
246 while (kvit->hasNext()) {
247 KeyValue *kv = kvit->next();
248 kv->encode(bbEncode);
251 Array<char> * array = bbEncode->array();
252 bbEncode->releaseArray();
257 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
258 keyValueUpdateSet->clear();
260 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
261 while (kvit->hasNext()) {
262 KeyValue *kv = kvit->next();
263 KeyValue *kvcopy = kv->getCopy();
264 liveKeys->add(kvcopy->getKey());
265 keyValueUpdateSet->add(kvcopy);
270 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
273 } else if (newer == NULL) {
276 Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
277 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
278 while (kvit->hasNext()) {
279 KeyValue *kv = kvit->next();
283 kvit = newer->getKeyValueUpdateSet()->iterator();
284 while (kvit->hasNext()) {
285 KeyValue *kv = kvit->next();
290 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
291 if (transactionSequenceNumber == -1) {
292 transactionSequenceNumber = older->getTransactionSequenceNumber();
295 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
296 newCommit->setKVsMap(kvSet);