--- /dev/null
+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+
+Commit::Commit() :
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
+ missingParts(NULL),
+ fldisComplete(false),
+ hasLastPart(false),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
+ isDead(false),
+ sequenceNumber(-1),
+ machineId(-1),
+ transactionSequenceNumber(-1),
+ dataBytes(NULL),
+ liveKeys(new Hashset<IoTString *>()) {
+}
+
+Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
+ missingParts(NULL),
+ fldisComplete(true),
+ hasLastPart(false),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
+ isDead(false),
+ sequenceNumber(_sequenceNumber),
+ machineId(_machineId),
+ transactionSequenceNumber(_transactionSequenceNumber),
+ dataBytes(NULL),
+ liveKeys(new Hashset<IoTString *>()) {
+}
+
+Commit::~Commit() {
+ {
+ uint Size = parts->size();
+ for(uint i=0;i<Size; i++)
+ parts->get(i)->releaseRef();
+ delete parts;
+ }
+ {
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
+ while(keyit->hasNext()) {
+ delete keyit->next();
+ }
+ delete keyit;
+ delete keyValueUpdateSet;
+ }
+ delete liveKeys;
+ if (missingParts != NULL)
+ delete missingParts;
+ if (dataBytes != NULL)
+ delete dataBytes;
+}
+
+void Commit::addPartDecode(CommitPart *newPart) {
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart->setDead();
+ return;
+ }
+
+ newPart->acquireRef();
+ CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (previouslySeenPart == NULL)
+ partCount++;
+
+ if (previouslySeenPart != NULL) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previouslySeenPart->setDead();
+ previouslySeenPart->releaseRef();
+ } else if (newPart->isLastPart()) {
+ missingParts = new Hashset<int32_t>();
+ hasLastPart = true;
+
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
+ }
+ }
+ }
+
+ if (!fldisComplete && hasLastPart) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts->remove(newPart->getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts->size() == 0) {
+
+ // We have all the parts
+ fldisComplete = true;
+
+ // Decode all the parts and create the key value guard and update sets
+ decodeCommitData();
+
+ // Get the sequence number and arbitrator of this transaction
+ sequenceNumber = parts->get(0)->getSequenceNumber();
+ machineId = parts->get(0)->getMachineId();
+ transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
+ }
+ }
+}
+
+int64_t Commit::getSequenceNumber() {
+ return sequenceNumber;
+}
+
+int64_t Commit::getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+}
+
+Vector<CommitPart *> *Commit::getParts() {
+ return parts;
+}
+
+void Commit::addKV(KeyValue *kv) {
+ KeyValue * kvcopy = kv->getCopy();
+ keyValueUpdateSet->add(kvcopy);
+ liveKeys->add(kvcopy->getKey());
+}
+
+void Commit::invalidateKey(IoTString *key) {
+ liveKeys->remove(key);
+
+ if (liveKeys->size() == 0) {
+ setDead();
+ }
+}
+
+Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+}
+
+int32_t Commit::getNumberOfParts() {
+ return partCount;
+}
+
+void Commit::setDead() {
+ if (!isDead) {
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+ CommitPart *part = parts->get(partNumber);
+ part->setDead();
+ }
+ }
+}
+
+void Commit::createCommitParts() {
+ uint Size = parts->size();
+ for(uint i=0;i < Size; i++) {
+ Entry * e=parts->get(i);
+ e->releaseRef();
+ }
+ parts->clear();
+ partCount = 0;
+ // Convert to chars
+ Array<char> *charData = convertDataToBytes();
+
+ int commitPartCount = 0;
+ int currentPosition = 0;
+ int remaining = charData->length();
+
+ while (remaining > 0) {
+ bool isLastPart = false;
+ // determine how much to copy
+ int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true;// last bit of data so last part
+ }
+
+ // Copy to a smaller version
+ Array<char> *partData = new Array<char>(copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
+
+ CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->setExpand(part->getPartNumber(), part);
+
+ // Update position, count and remaining
+ currentPosition += copySize;
+ commitPartCount++;
+ remaining -= copySize;
+ }
+ delete charData;
+}
+
+void Commit::decodeCommitData() {
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (uint i = 0; i < parts->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ if (tp != NULL)
+ dataSize += tp->getDataSize();
+ }
+
+ Array<char> *combinedData = new Array<char>(dataSize);
+ int currentPosition = 0;
+
+ // Stitch all the data sections together
+ for (uint i = 0; i < parts->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ if (tp != NULL) {
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
+ }
+
+ // Decoder Object
+ ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVUpdates = bbDecode->getInt();
+
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+ }
+ delete bbDecode;
+}
+
+Array<char> *Commit::convertDataToBytes() {
+ // Calculate the size of the data
+ int sizeOfData = sizeof(int32_t); // Number of Update KV's
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ sizeOfData += kv->getSize();
+ }
+ delete kvit;
+
+ // Data handlers and storage
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
+
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueUpdateSet->size());
+
+ // Encode all the updates
+ kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kv->encode(bbEncode);
+ }
+ delete kvit;
+ Array<char> * array = bbEncode->array();
+ bbEncode->releaseArray();
+ delete bbEncode;
+ return array;
+}
+
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
+ keyValueUpdateSet->clear();
+ liveKeys->clear();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ KeyValue *kvcopy = kv->getCopy();
+ liveKeys->add(kvcopy->getKey());
+ keyValueUpdateSet->add(kvcopy);
+ }
+ delete kvit;
+}
+
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
+ if (older == NULL) {
+ return newer;
+ } else if (newer == NULL) {
+ return older;
+ }
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
+ }
+ delete kvit;
+ kvit = newer->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
+ }
+ delete kvit;
+
+ int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
+ if (transactionSequenceNumber == -1) {
+ transactionSequenceNumber = older->getTransactionSequenceNumber();
+ }
+
+ Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
+ newCommit->setKVsMap(kvSet);
+
+ delete kvSet;
+ return newCommit;
+}