2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Hashtable<int32_t, CommitPart *>()),
11 keyValueUpdateSet(new Hashset<KeyValue *>()),
15 transactionSequenceNumber(-1),
16 liveKeys(new Hashset<IoTString *>) {
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21 parts(new Hashtable<int32_t, CommitPart *>()),
25 keyValueUpdateSet(new Hashset<KeyValue *>()),
27 sequenceNumber(_sequenceNumber),
28 machineId(_machineId),
29 transactionSequenceNumber(_transactionSequenceNumber),
30 liveKeys(new Hashset<IoTString *>) {
33 void Commit::addPartDecode(CommitPart *newPart) {
35 // If dead then just kill this part and move on
40 CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
42 if (previoslySeenPart != NULL) {
43 // Set dead the old one since the new one is a rescued version of this part
44 previoslySeenPart->setDead();
45 } else if (newPart->isLastPart()) {
46 missingParts = new Hashset<int32_t>();
49 for (int i = 0; i < newPart->getPartNumber(); i++) {
50 if (parts->get(i) == NULL) {
56 if (!fldisComplete && hasLastPart) {
58 // We have seen this part so remove it from the set of missing parts
59 missingParts->remove(newPart->getPartNumber());
61 // Check if all the parts have been seen
62 if (missingParts->size() == 0) {
64 // We have all the parts
67 // Decode all the parts and create the key value guard and update sets
70 // Get the sequence number and arbitrator of this transaction
71 sequenceNumber = parts->get(0)->getSequenceNumber();
72 machineId = parts->get(0)->getMachineId();
73 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
78 int64_t Commit::getSequenceNumber() {
79 return sequenceNumber;
82 int64_t Commit::getTransactionSequenceNumber() {
83 return transactionSequenceNumber;
86 Hashtable<int32_t, CommitPart *> *Commit::getParts() {
90 void Commit::addKV(KeyValue *kv) {
91 keyValueUpdateSet->add(kv);
92 liveKeys->add(kv->getKey());
95 void Commit::invalidateKey(IoTString *key) {
96 liveKeys->remove(key);
98 if (liveKeys->size() == 0) {
103 Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
104 return keyValueUpdateSet;
107 int32_t Commit::getNumberOfParts() {
108 return parts->size();
111 void Commit::setDead() {
120 // Make all the parts of this transaction dead
121 for (int32_t partNumber : parts->keySet()) {
122 CommitPart *part = parts->get(partNumber);
127 CommitPart *Commit::getPart(int index) {
128 return parts->get(index);
131 void Commit::createCommitParts() {
135 Array<char> *charData = convertDataToBytes();
138 int commitPartCount = 0;
139 int currentPosition = 0;
140 int remaining = charData->length();
142 while (remaining > 0) {
144 bool isLastPart = false;
145 // determine how much to copy
146 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
147 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
148 copySize = remaining;
149 isLastPart = true;// last bit of data so last part
152 // Copy to a smaller version
153 Array<char> *partData = new Array<char>(copySize);
154 System_arraycopy(charData, currentPosition, partData, 0, copySize);
156 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
157 parts->put(part->getPartNumber(), part);
159 // Update position, count and remaining
160 currentPosition += copySize;
162 remaining -= copySize;
166 void Commit::decodeCommitData() {
168 // Calculate the size of the data section
170 for (int i = 0; i < parts->keySet()->size(); i++) {
171 CommitPart *tp = parts->get(i);
172 dataSize += tp->getDataSize();
175 Array<char> *combinedData = new Array<char>(dataSize);
176 int currentPosition = 0;
178 // Stitch all the data sections together
179 for (int i = 0; i < parts->keySet()->size(); i++) {
180 CommitPart *tp = parts->get(i);
181 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
182 currentPosition += tp->getDataSize();
186 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
188 // Decode how many key value pairs need to be decoded
189 int numberOfKVUpdates = bbDecode->getInt();
191 // Decode all the updates key values
192 for (int i = 0; i < numberOfKVUpdates; i++) {
193 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
194 keyValueUpdateSet->add(kv);
195 liveKeys->add(kv->getKey());
199 Array<char> *convertDataToBytes() {
201 // Calculate the size of the data
202 int sizeOfData = sizeof(int32_t); // Number of Update KV's
203 for (KeyValue *kv : keyValueUpdateSet) {
204 sizeOfData += kv->getSize();
207 // Data handlers and storage
208 Array<char> *dataArray = new Array<char>(sizeOfData);
209 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
211 // Encode the size of the updates and guard sets
212 bbEncode->putInt(keyValueUpdateSet->size());
214 // Encode all the updates
215 for (KeyValue *kv : keyValueUpdateSet) {
216 kv->encode(bbEncode);
219 return bbEncode->array();
222 void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
223 keyValueUpdateSet->clear();
226 keyValueUpdateSet->addAll(newKVs->values());
227 liveKeys->addAll(newKVs->keySet());
230 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
234 } else if (newer == NULL) {
238 Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
239 for (KeyValue *kv : older->getKeyValueUpdateSet()) {
240 kvSet->put(kv->getKey(), kv);
243 for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
244 kvSet->put(kv->getKey(), kv);
247 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
249 if (transactionSequenceNumber == -1) {
250 transactionSequenceNumber = older->getTransactionSequenceNumber();
253 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
255 newCommit->setKVsMap(kvSet);