rename files
[iotcloud.git] / version2 / src / C / Commit.cpp
diff --git a/version2/src/C/Commit.cpp b/version2/src/C/Commit.cpp
new file mode 100644 (file)
index 0000000..7665841
--- /dev/null
@@ -0,0 +1,300 @@
+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+
+Commit::Commit() :
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
+       missingParts(NULL),
+       fldisComplete(false),
+       hasLastPart(false),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
+       isDead(false),
+       sequenceNumber(-1),
+       machineId(-1),
+       transactionSequenceNumber(-1),
+       dataBytes(NULL),
+       liveKeys(new Hashset<IoTString *>()) {
+}
+
+Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
+       missingParts(NULL),
+       fldisComplete(true),
+       hasLastPart(false),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
+       isDead(false),
+       sequenceNumber(_sequenceNumber),
+       machineId(_machineId),
+       transactionSequenceNumber(_transactionSequenceNumber),
+       dataBytes(NULL),
+       liveKeys(new Hashset<IoTString *>()) {
+}
+
+Commit::~Commit() {
+       {
+               uint Size = parts->size();
+               for(uint i=0;i<Size; i++)
+                       parts->get(i)->releaseRef();
+               delete parts;
+       }
+       {
+               SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
+               while(keyit->hasNext()) {
+                       delete keyit->next();
+               }
+               delete keyit;
+               delete keyValueUpdateSet;
+       }
+       delete liveKeys;
+       if (missingParts != NULL)
+               delete missingParts;
+       if (dataBytes != NULL)
+               delete dataBytes;
+}
+
+void Commit::addPartDecode(CommitPart *newPart) {
+       if (isDead) {
+               // If dead then just kill this part and move on
+               newPart->setDead();
+               return;
+       }
+
+       newPart->acquireRef();
+       CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (previouslySeenPart == NULL)
+               partCount++;
+
+       if (previouslySeenPart != NULL) {
+               // Set dead the old one since the new one is a rescued version of this part
+               previouslySeenPart->setDead();
+               previouslySeenPart->releaseRef();
+       } else if (newPart->isLastPart()) {
+               missingParts = new Hashset<int32_t>();
+               hasLastPart = true;
+
+               for (int i = 0; i < newPart->getPartNumber(); i++) {
+                       if (parts->get(i) == NULL) {
+                               missingParts->add(i);
+                       }
+               }
+       }
+
+       if (!fldisComplete && hasLastPart) {
+
+               // We have seen this part so remove it from the set of missing parts
+               missingParts->remove(newPart->getPartNumber());
+
+               // Check if all the parts have been seen
+               if (missingParts->size() == 0) {
+
+                       // We have all the parts
+                       fldisComplete = true;
+
+                       // Decode all the parts and create the key value guard and update sets
+                       decodeCommitData();
+
+                       // Get the sequence number and arbitrator of this transaction
+                       sequenceNumber = parts->get(0)->getSequenceNumber();
+                       machineId = parts->get(0)->getMachineId();
+                       transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
+               }
+       }
+}
+
+int64_t Commit::getSequenceNumber() {
+       return sequenceNumber;
+}
+
+int64_t Commit::getTransactionSequenceNumber() {
+       return transactionSequenceNumber;
+}
+
+Vector<CommitPart *> *Commit::getParts() {
+       return parts;
+}
+
+void Commit::addKV(KeyValue *kv) {
+       KeyValue * kvcopy = kv->getCopy();
+       keyValueUpdateSet->add(kvcopy);
+       liveKeys->add(kvcopy->getKey());
+}
+
+void Commit::invalidateKey(IoTString *key) {
+       liveKeys->remove(key);
+
+       if (liveKeys->size() == 0) {
+               setDead();
+       }
+}
+
+Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
+       return keyValueUpdateSet;
+}
+
+int32_t Commit::getNumberOfParts() {
+       return partCount;
+}
+
+void Commit::setDead() {
+       if (!isDead) {
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+                       CommitPart *part = parts->get(partNumber);
+                       part->setDead();
+               }
+       }
+}
+
+void Commit::createCommitParts() {
+       uint Size = parts->size();
+       for(uint i=0;i < Size; i++) {
+               Entry * e=parts->get(i);
+               e->releaseRef();
+       }
+       parts->clear();
+       partCount = 0;
+       // Convert to chars
+       Array<char> *charData = convertDataToBytes();
+
+       int commitPartCount = 0;
+       int currentPosition = 0;
+       int remaining = charData->length();
+
+       while (remaining > 0) {
+               bool isLastPart = false;
+               // determine how much to copy
+               int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+               if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+                       copySize = remaining;
+                       isLastPart = true;// last bit of data so last part
+               }
+
+               // Copy to a smaller version
+               Array<char> *partData = new Array<char>(copySize);
+               System_arraycopy(charData, currentPosition, partData, 0, copySize);
+
+               CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+               parts->setExpand(part->getPartNumber(), part);
+
+               // Update position, count and remaining
+               currentPosition += copySize;
+               commitPartCount++;
+               remaining -= copySize;
+       }
+       delete charData;
+}
+
+void Commit::decodeCommitData() {
+       // Calculate the size of the data section
+       int dataSize = 0;
+       for (uint i = 0; i < parts->size(); i++) {
+               CommitPart *tp = parts->get(i);
+               if (tp != NULL)
+                       dataSize += tp->getDataSize();
+       }
+
+       Array<char> *combinedData = new Array<char>(dataSize);
+       int currentPosition = 0;
+
+       // Stitch all the data sections together
+       for (uint i = 0; i < parts->size(); i++) {
+               CommitPart *tp = parts->get(i);
+               if (tp != NULL) {
+                       System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+                       currentPosition += tp->getDataSize();
+               }
+       }
+
+       // Decoder Object
+       ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
+
+       // Decode how many key value pairs need to be decoded
+       int numberOfKVUpdates = bbDecode->getInt();
+
+       // Decode all the updates key values
+       for (int i = 0; i < numberOfKVUpdates; i++) {
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueUpdateSet->add(kv);
+               liveKeys->add(kv->getKey());
+       }
+       delete bbDecode;
+}
+
+Array<char> *Commit::convertDataToBytes() {
+       // Calculate the size of the data
+       int sizeOfData = sizeof(int32_t);       // Number of Update KV's
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               sizeOfData += kv->getSize();
+       }
+       delete kvit;
+
+       // Data handlers and storage
+       Array<char> *dataArray = new Array<char>(sizeOfData);
+       ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
+
+       // Encode the size of the updates and guard sets
+       bbEncode->putInt(keyValueUpdateSet->size());
+
+       // Encode all the updates
+       kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kv->encode(bbEncode);
+       }
+       delete kvit;
+       Array<char> * array = bbEncode->array();
+       bbEncode->releaseArray();
+       delete bbEncode;
+       return array;
+}
+
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
+       keyValueUpdateSet->clear();
+       liveKeys->clear();
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               KeyValue *kvcopy = kv->getCopy();
+               liveKeys->add(kvcopy->getKey());
+               keyValueUpdateSet->add(kvcopy);
+       }
+       delete kvit;
+}
+
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
+       if (older == NULL) {
+               return newer;
+       } else if (newer == NULL) {
+               return older;
+       }
+       Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kvSet->add(kv);
+       }
+       delete kvit;
+       kvit = newer->getKeyValueUpdateSet()->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kvSet->add(kv);
+       }
+       delete kvit;
+
+       int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
+       if (transactionSequenceNumber == -1) {
+               transactionSequenceNumber = older->getTransactionSequenceNumber();
+       }
+
+       Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
+       newCommit->setKVsMap(kvSet);
+
+       delete kvSet;
+       return newCommit;
+}