2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Vector<CommitPart *>()),
12 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
16 transactionSequenceNumber(-1),
17 liveKeys(new Hashset<IoTString *>) {
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21 parts(new Vector<CommitPart *>()),
26 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
28 sequenceNumber(_sequenceNumber),
29 machineId(_machineId),
30 transactionSequenceNumber(_transactionSequenceNumber),
31 liveKeys(new Hashset<IoTString *>) {
34 void Commit::addPartDecode(CommitPart *newPart) {
36 // If dead then just kill this part and move on
41 CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
42 if(previouslySeenPart == NULL)
45 if (previouslySeenPart != NULL) {
46 // Set dead the old one since the new one is a rescued version of this part
47 previouslySeenPart->setDead();
48 } else if (newPart->isLastPart()) {
49 missingParts = new Hashset<int32_t>();
52 for (int i = 0; i < newPart->getPartNumber(); i++) {
53 if (parts->get(i) == NULL) {
59 if (!fldisComplete && hasLastPart) {
61 // We have seen this part so remove it from the set of missing parts
62 missingParts->remove(newPart->getPartNumber());
64 // Check if all the parts have been seen
65 if (missingParts->size() == 0) {
67 // We have all the parts
70 // Decode all the parts and create the key value guard and update sets
73 // Get the sequence number and arbitrator of this transaction
74 sequenceNumber = parts->get(0)->getSequenceNumber();
75 machineId = parts->get(0)->getMachineId();
76 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
81 int64_t Commit::getSequenceNumber() {
82 return sequenceNumber;
85 int64_t Commit::getTransactionSequenceNumber() {
86 return transactionSequenceNumber;
89 Vector<CommitPart *> *Commit::getParts() {
93 void Commit::addKV(KeyValue *kv) {
94 keyValueUpdateSet->add(kv);
95 liveKeys->add(kv->getKey());
98 void Commit::invalidateKey(IoTString *key) {
99 liveKeys->remove(key);
101 if (liveKeys->size() == 0) {
106 Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
107 return keyValueUpdateSet;
110 int32_t Commit::getNumberOfParts() {
114 void Commit::setDead() {
117 // Make all the parts of this transaction dead
118 for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
119 CommitPart *part = parts->get(partNumber);
126 CommitPart *Commit::getPart(int index) {
127 return parts->get(index);
130 void Commit::createCommitParts() {
134 Array<char> *charData = convertDataToBytes();
136 int commitPartCount = 0;
137 int currentPosition = 0;
138 int remaining = charData->length();
140 while (remaining > 0) {
141 bool isLastPart = false;
142 // determine how much to copy
143 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
144 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
145 copySize = remaining;
146 isLastPart = true;// last bit of data so last part
149 // Copy to a smaller version
150 Array<char> *partData = new Array<char>(copySize);
151 System_arraycopy(charData, currentPosition, partData, 0, copySize);
153 CommitPart* part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
154 parts->setExpand(part->getPartNumber(), part);
156 // Update position, count and remaining
157 currentPosition += copySize;
159 remaining -= copySize;
163 void Commit::decodeCommitData() {
164 // Calculate the size of the data section
166 for (int i = 0; i < parts->size(); i++) {
167 CommitPart *tp = parts->get(i);
169 dataSize += tp->getDataSize();
172 Array<char> *combinedData = new Array<char>(dataSize);
173 int currentPosition = 0;
175 // Stitch all the data sections together
176 for (int i = 0; i < parts->size(); i++) {
177 CommitPart *tp = parts->get(i);
179 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
180 currentPosition += tp->getDataSize();
185 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
187 // Decode how many key value pairs need to be decoded
188 int numberOfKVUpdates = bbDecode->getInt();
190 // Decode all the updates key values
191 for (int i = 0; i < numberOfKVUpdates; i++) {
192 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
193 keyValueUpdateSet->add(kv);
194 liveKeys->add(kv->getKey());
198 Array<char> *Commit::convertDataToBytes() {
199 // Calculate the size of the data
200 int sizeOfData = sizeof(int32_t); // Number of Update KV's
201 SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> * kvit = keyValueUpdateSet->iterator();
202 while(kvit->hasNext()) {
203 KeyValue * kv = kvit->next();
204 sizeOfData += kv->getSize();
208 // Data handlers and storage
209 Array<char> *dataArray = new Array<char>(sizeOfData);
210 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
212 // Encode the size of the updates and guard sets
213 bbEncode->putInt(keyValueUpdateSet->size());
215 // Encode all the updates
216 kvit = keyValueUpdateSet->iterator();
217 while(kvit->hasNext()) {
218 KeyValue * kv = kvit->next();
219 kv->encode(bbEncode);
223 return bbEncode->array();
226 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
227 keyValueUpdateSet->clear();
228 keyValueUpdateSet->addAll(newKVs);
230 SetIterator<KeyValue*, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
231 while(kvit->hasNext()) {
232 liveKeys->add(kvit->next()->getKey());
237 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
240 } else if (newer == NULL) {
243 Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
244 SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
245 while(kvit->hasNext()) {
246 KeyValue* kv=kvit->next();
250 kvit = newer->getKeyValueUpdateSet()->iterator();
251 while(kvit->hasNext()) {
252 KeyValue* kv=kvit->next();
257 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
258 if (transactionSequenceNumber == -1) {
259 transactionSequenceNumber = older->getTransactionSequenceNumber();
262 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
263 newCommit->setKVsMap(kvSet);