2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Hashtable<int32_t, CommitPart *>()),
11 keyValueUpdateSet(new Hashset<KeyValue *>()),
15 transactionSequenceNumber(-1),
16 liveKeys(new Hashset<IoTString *>) {
19 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
20 parts(new Hashtable<int32_t, CommitPart *>()),
24 keyValueUpdateSet(new Hashset<KeyValue *>()),
26 sequenceNumber(_sequenceNumber),
27 machineId(_machineId),
28 transactionSequenceNumber(_transactionSequenceNumber),
29 liveKeys(new Hashset<IoTString *>) {
32 void Commit::addPartDecode(CommitPart *newPart) {
34 // If dead then just kill this part and move on
39 CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
41 if (previoslySeenPart != NULL) {
42 // Set dead the old one since the new one is a rescued version of this part
43 previoslySeenPart->setDead();
44 } else if (newPart->isLastPart()) {
45 missingParts = new Hashset<int32_t>();
48 for (int i = 0; i < newPart->getPartNumber(); i++) {
49 if (parts->get(i) == NULL) {
55 if (!fldisComplete && hasLastPart) {
57 // We have seen this part so remove it from the set of missing parts
58 missingParts->remove(newPart->getPartNumber());
60 // Check if all the parts have been seen
61 if (missingParts->size() == 0) {
63 // We have all the parts
66 // Decode all the parts and create the key value guard and update sets
69 // Get the sequence number and arbitrator of this transaction
70 sequenceNumber = parts->get(0)->getSequenceNumber();
71 machineId = parts->get(0)->getMachineId();
72 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
77 int64_t Commit::getSequenceNumber() {
78 return sequenceNumber;
81 int64_t Commit::getTransactionSequenceNumber() {
82 return transactionSequenceNumber;
85 Hashtable<int32_t, CommitPart *> *Commit::getParts() {
89 void Commit::addKV(KeyValue *kv) {
90 keyValueUpdateSet->add(kv);
91 liveKeys->add(kv->getKey());
94 void Commit::invalidateKey(IoTString *key) {
95 liveKeys->remove(key);
97 if (liveKeys->size() == 0) {
102 Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
103 return keyValueUpdateSet;
106 int32_t Commit::getNumberOfParts() {
107 return parts->size();
110 void Commit::setDead() {
113 // Make all the parts of this transaction dead
114 for (int32_t partNumber : parts->keySet()) {
115 CommitPart *part = parts->get(partNumber);
121 CommitPart *Commit::getPart(int index) {
122 return parts->get(index);
125 void Commit::createCommitParts() {
128 Array<char> *charData = convertDataToBytes();
130 int commitPartCount = 0;
131 int currentPosition = 0;
132 int remaining = charData->length();
134 while (remaining > 0) {
135 bool isLastPart = false;
136 // determine how much to copy
137 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
138 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
139 copySize = remaining;
140 isLastPart = true;// last bit of data so last part
143 // Copy to a smaller version
144 Array<char> *partData = new Array<char>(copySize);
145 System_arraycopy(charData, currentPosition, partData, 0, copySize);
147 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
148 parts->put(part->getPartNumber(), part);
150 // Update position, count and remaining
151 currentPosition += copySize;
153 remaining -= copySize;
157 void Commit::decodeCommitData() {
158 // Calculate the size of the data section
160 for (int i = 0; i < parts->keySet()->size(); i++) {
161 CommitPart *tp = parts->get(i);
162 dataSize += tp->getDataSize();
165 Array<char> *combinedData = new Array<char>(dataSize);
166 int currentPosition = 0;
168 // Stitch all the data sections together
169 for (int i = 0; i < parts->keySet()->size(); i++) {
170 CommitPart *tp = parts->get(i);
171 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
172 currentPosition += tp->getDataSize();
176 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
178 // Decode how many key value pairs need to be decoded
179 int numberOfKVUpdates = bbDecode->getInt();
181 // Decode all the updates key values
182 for (int i = 0; i < numberOfKVUpdates; i++) {
183 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
184 keyValueUpdateSet->add(kv);
185 liveKeys->add(kv->getKey());
189 Array<char> *convertDataToBytes() {
191 // Calculate the size of the data
192 int sizeOfData = sizeof(int32_t); // Number of Update KV's
193 for (KeyValue *kv : keyValueUpdateSet) {
194 sizeOfData += kv->getSize();
197 // Data handlers and storage
198 Array<char> *dataArray = new Array<char>(sizeOfData);
199 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
201 // Encode the size of the updates and guard sets
202 bbEncode->putInt(keyValueUpdateSet->size());
204 // Encode all the updates
205 for (KeyValue *kv : keyValueUpdateSet) {
206 kv->encode(bbEncode);
209 return bbEncode->array();
212 void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
213 keyValueUpdateSet->clear();
214 keyValueUpdateSet->addAll(newKVs->values());
216 liveKeys->addAll(newKVs->keySet());
219 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
222 } else if (newer == NULL) {
225 Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
226 for (KeyValue *kv : older->getKeyValueUpdateSet()) {
227 kvSet->put(kv->getKey(), kv);
229 for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
230 kvSet->put(kv->getKey(), kv);
233 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
234 if (transactionSequenceNumber == -1) {
235 transactionSequenceNumber = older->getTransactionSequenceNumber();
238 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
239 newCommit->setKVsMap(kvSet);