2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Vector<CommitPart *>()),
11 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
15 transactionSequenceNumber(-1),
17 liveKeys(new Hashset<IoTString *>()) {
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21 parts(new Vector<CommitPart *>()),
25 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
27 sequenceNumber(_sequenceNumber),
28 machineId(_machineId),
29 transactionSequenceNumber(_transactionSequenceNumber),
31 liveKeys(new Hashset<IoTString *>()) {
36 uint Size = parts->size();
37 for(uint i=0;i<Size; i++)
38 parts->get(i)->releaseRef();
42 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
43 while(keyit->hasNext()) {
47 delete keyValueUpdateSet;
50 if (dataBytes != NULL)
54 void Commit::addPartDecode(CommitPart *newPart) {
56 // If dead then just kill this part and move on
61 newPart->acquireRef();
62 CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
63 if (previouslySeenPart == NULL)
66 if (previouslySeenPart != NULL) {
67 // Set dead the old one since the new one is a rescued version of this part
68 previouslySeenPart->setDead();
69 previouslySeenPart->releaseRef();
70 } else if (newPart->isLastPart()) {
74 if (!fldisComplete && hasLastPart) {
75 // We have seen this part so remove it from the set of missing parts
76 uint size = parts->size();
78 for(uint i=0; i < size; i++) {
79 if (parts->get(i) == NULL) {
80 fldisComplete = false;
86 // Decode all the parts and create the key value guard and update sets
89 // Get the sequence number and arbitrator of this transaction
90 sequenceNumber = parts->get(0)->getSequenceNumber();
91 machineId = parts->get(0)->getMachineId();
92 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
97 int64_t Commit::getSequenceNumber() {
98 return sequenceNumber;
101 int64_t Commit::getTransactionSequenceNumber() {
102 return transactionSequenceNumber;
105 Vector<CommitPart *> *Commit::getParts() {
109 void Commit::addKV(KeyValue *kv) {
110 KeyValue * kvcopy = kv->getCopy();
111 keyValueUpdateSet->add(kvcopy);
112 liveKeys->add(kvcopy->getKey());
115 void Commit::invalidateKey(IoTString *key) {
116 liveKeys->remove(key);
118 if (liveKeys->size() == 0) {
123 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
124 return keyValueUpdateSet;
127 int32_t Commit::getNumberOfParts() {
131 void Commit::setDead() {
134 // Make all the parts of this transaction dead
135 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
136 CommitPart *part = parts->get(partNumber);
142 void Commit::createCommitParts() {
143 uint Size = parts->size();
144 for(uint i=0;i < Size; i++) {
145 Entry * e=parts->get(i);
151 Array<char> *charData = convertDataToBytes();
153 int commitPartCount = 0;
154 int currentPosition = 0;
155 int remaining = charData->length();
157 while (remaining > 0) {
158 bool isLastPart = false;
159 // determine how much to copy
160 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
161 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
162 copySize = remaining;
163 isLastPart = true;// last bit of data so last part
166 // Copy to a smaller version
167 Array<char> *partData = new Array<char>(copySize);
168 System_arraycopy(charData, currentPosition, partData, 0, copySize);
170 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
171 parts->setExpand(part->getPartNumber(), part);
173 // Update position, count and remaining
174 currentPosition += copySize;
176 remaining -= copySize;
181 void Commit::decodeCommitData() {
182 // Calculate the size of the data section
184 for (uint i = 0; i < parts->size(); i++) {
185 CommitPart *tp = parts->get(i);
187 dataSize += tp->getDataSize();
190 Array<char> *combinedData = new Array<char>(dataSize);
191 int currentPosition = 0;
193 // Stitch all the data sections together
194 for (uint i = 0; i < parts->size(); i++) {
195 CommitPart *tp = parts->get(i);
197 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
198 currentPosition += tp->getDataSize();
203 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
205 // Decode how many key value pairs need to be decoded
206 int numberOfKVUpdates = bbDecode->getInt();
208 // Decode all the updates key values
209 for (int i = 0; i < numberOfKVUpdates; i++) {
210 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
211 keyValueUpdateSet->add(kv);
212 liveKeys->add(kv->getKey());
217 Array<char> *Commit::convertDataToBytes() {
218 // Calculate the size of the data
219 int sizeOfData = sizeof(int32_t); // Number of Update KV's
220 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
221 while (kvit->hasNext()) {
222 KeyValue *kv = kvit->next();
223 sizeOfData += kv->getSize();
227 // Data handlers and storage
228 Array<char> *dataArray = new Array<char>(sizeOfData);
229 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
231 // Encode the size of the updates and guard sets
232 bbEncode->putInt(keyValueUpdateSet->size());
234 // Encode all the updates
235 kvit = keyValueUpdateSet->iterator();
236 while (kvit->hasNext()) {
237 KeyValue *kv = kvit->next();
238 kv->encode(bbEncode);
241 Array<char> * array = bbEncode->array();
242 bbEncode->releaseArray();
247 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
248 keyValueUpdateSet->clear();
250 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
251 while (kvit->hasNext()) {
252 KeyValue *kv = kvit->next();
253 KeyValue *kvcopy = kv->getCopy();
254 liveKeys->add(kvcopy->getKey());
255 keyValueUpdateSet->add(kvcopy);
260 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
263 } else if (newer == NULL) {
266 Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
267 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
268 while (kvit->hasNext()) {
269 KeyValue *kv = kvit->next();
273 kvit = newer->getKeyValueUpdateSet()->iterator();
274 while (kvit->hasNext()) {
275 KeyValue *kv = kvit->next();
280 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
281 if (transactionSequenceNumber == -1) {
282 transactionSequenceNumber = older->getTransactionSequenceNumber();
285 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
286 newCommit->setKVsMap(kvSet);