2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Vector<CommitPart *>()),
12 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
16 transactionSequenceNumber(-1),
17 liveKeys(new Hashset<IoTString *>) {
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21 parts(new Vector<CommitPart *>()),
26 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
28 sequenceNumber(_sequenceNumber),
29 machineId(_machineId),
30 transactionSequenceNumber(_transactionSequenceNumber),
31 liveKeys(new Hashset<IoTString *>) {
36 delete keyValueUpdateSet;
38 if (missingParts != NULL)
42 void Commit::addPartDecode(CommitPart *newPart) {
44 // If dead then just kill this part and move on
49 CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
50 if (previouslySeenPart == NULL)
53 if (previouslySeenPart != NULL) {
54 // Set dead the old one since the new one is a rescued version of this part
55 previouslySeenPart->setDead();
56 } else if (newPart->isLastPart()) {
57 missingParts = new Hashset<int32_t>();
60 for (int i = 0; i < newPart->getPartNumber(); i++) {
61 if (parts->get(i) == NULL) {
67 if (!fldisComplete && hasLastPart) {
69 // We have seen this part so remove it from the set of missing parts
70 missingParts->remove(newPart->getPartNumber());
72 // Check if all the parts have been seen
73 if (missingParts->size() == 0) {
75 // We have all the parts
78 // Decode all the parts and create the key value guard and update sets
81 // Get the sequence number and arbitrator of this transaction
82 sequenceNumber = parts->get(0)->getSequenceNumber();
83 machineId = parts->get(0)->getMachineId();
84 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
89 int64_t Commit::getSequenceNumber() {
90 return sequenceNumber;
93 int64_t Commit::getTransactionSequenceNumber() {
94 return transactionSequenceNumber;
97 Vector<CommitPart *> *Commit::getParts() {
101 void Commit::addKV(KeyValue *kv) {
102 keyValueUpdateSet->add(kv);
103 liveKeys->add(kv->getKey());
106 void Commit::invalidateKey(IoTString *key) {
107 liveKeys->remove(key);
109 if (liveKeys->size() == 0) {
114 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
115 return keyValueUpdateSet;
118 int32_t Commit::getNumberOfParts() {
122 void Commit::setDead() {
125 // Make all the parts of this transaction dead
126 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
127 CommitPart *part = parts->get(partNumber);
134 CommitPart *Commit::getPart(int index) {
135 return parts->get(index);
138 void Commit::createCommitParts() {
142 Array<char> *charData = convertDataToBytes();
144 int commitPartCount = 0;
145 int currentPosition = 0;
146 int remaining = charData->length();
148 while (remaining > 0) {
149 bool isLastPart = false;
150 // determine how much to copy
151 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
152 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
153 copySize = remaining;
154 isLastPart = true;// last bit of data so last part
157 // Copy to a smaller version
158 Array<char> *partData = new Array<char>(copySize);
159 System_arraycopy(charData, currentPosition, partData, 0, copySize);
161 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
162 parts->setExpand(part->getPartNumber(), part);
164 // Update position, count and remaining
165 currentPosition += copySize;
167 remaining -= copySize;
171 void Commit::decodeCommitData() {
172 // Calculate the size of the data section
174 for (uint i = 0; i < parts->size(); i++) {
175 CommitPart *tp = parts->get(i);
177 dataSize += tp->getDataSize();
180 Array<char> *combinedData = new Array<char>(dataSize);
181 int currentPosition = 0;
183 // Stitch all the data sections together
184 for (uint i = 0; i < parts->size(); i++) {
185 CommitPart *tp = parts->get(i);
187 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
188 currentPosition += tp->getDataSize();
193 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
195 // Decode how many key value pairs need to be decoded
196 int numberOfKVUpdates = bbDecode->getInt();
198 // Decode all the updates key values
199 for (int i = 0; i < numberOfKVUpdates; i++) {
200 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
201 keyValueUpdateSet->add(kv);
202 liveKeys->add(kv->getKey());
206 Array<char> *Commit::convertDataToBytes() {
207 // Calculate the size of the data
208 int sizeOfData = sizeof(int32_t); // Number of Update KV's
209 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
210 while (kvit->hasNext()) {
211 KeyValue *kv = kvit->next();
212 sizeOfData += kv->getSize();
216 // Data handlers and storage
217 Array<char> *dataArray = new Array<char>(sizeOfData);
218 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
220 // Encode the size of the updates and guard sets
221 bbEncode->putInt(keyValueUpdateSet->size());
223 // Encode all the updates
224 kvit = keyValueUpdateSet->iterator();
225 while (kvit->hasNext()) {
226 KeyValue *kv = kvit->next();
227 kv->encode(bbEncode);
231 return bbEncode->array();
234 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0> *newKVs) {
235 keyValueUpdateSet->clear();
236 keyValueUpdateSet->addAll(newKVs);
238 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = newKVs->iterator();
239 while (kvit->hasNext()) {
240 liveKeys->add(kvit->next()->getKey());
245 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
248 } else if (newer == NULL) {
251 Hashset<KeyValue *, uintptr_t, 0> *kvSet = new Hashset<KeyValue *, uintptr_t, 0>();
252 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
253 while (kvit->hasNext()) {
254 KeyValue *kv = kvit->next();
258 kvit = newer->getKeyValueUpdateSet()->iterator();
259 while (kvit->hasNext()) {
260 KeyValue *kv = kvit->next();
265 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
266 if (transactionSequenceNumber == -1) {
267 transactionSequenceNumber = older->getTransactionSequenceNumber();
270 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
271 newCommit->setKVsMap(kvSet);