edits
[iotcloud.git] / version2 / src / C / Commit.cc
index 34d51dcffba610182c03835b82dedd79ddaa6a6d..a24e0148df376b6180c7bd574ff499c6c6b37519 100644 (file)
@@ -1,14 +1,15 @@
 #include "Commit.h"
 #include "CommitPart.h"
 #include "ByteBuffer.h"
-#include "KeyValue.h"
+#include "IoTString.h"
 
 Commit::Commit() :
-       parts(new Hashtable<int32_t, CommitPart *>()),
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
        missingParts(NULL),
        fldisComplete(false),
        hasLastPart(false),
-       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
        isDead(false),
        sequenceNumber(-1),
        machineId(-1),
@@ -16,13 +17,13 @@ Commit::Commit() :
        liveKeys(new Hashset<IoTString *>) {
 }
 
-
 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
-       parts(new Hashtable<int32_t, CommitPart *>()),
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
        missingParts(NULL),
        fldisComplete(true),
        hasLastPart(false),
-       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
        isDead(false),
        sequenceNumber(_sequenceNumber),
        machineId(_machineId),
@@ -30,6 +31,14 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction
        liveKeys(new Hashset<IoTString *>) {
 }
 
+Commit::~Commit() {
+       delete parts;
+       delete keyValueUpdateSet;
+       delete liveKeys;
+       if (missingParts != NULL)
+               delete missingParts;
+}
+
 void Commit::addPartDecode(CommitPart *newPart) {
        if (isDead) {
                // If dead then just kill this part and move on
@@ -37,11 +46,13 @@ void Commit::addPartDecode(CommitPart *newPart) {
                return;
        }
 
-       CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
+       CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (previouslySeenPart == NULL)
+               partCount++;
 
-       if (previoslySeenPart != NULL) {
+       if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
-               previoslySeenPart->setDead();
+               previouslySeenPart->setDead();
        } else if (newPart->isLastPart()) {
                missingParts = new Hashset<int32_t>();
                hasLastPart = true;
@@ -83,7 +94,7 @@ int64_t Commit::getTransactionSequenceNumber() {
        return transactionSequenceNumber;
 }
 
-Hashtable<int32_t, CommitPart *> *Commit::getParts() {
+Vector<CommitPart *> *Commit::getParts() {
        return parts;
 }
 
@@ -100,27 +111,23 @@ void Commit::invalidateKey(IoTString *key) {
        }
 }
 
-Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
+Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
        return keyValueUpdateSet;
 }
 
 int32_t Commit::getNumberOfParts() {
-       return parts->size();
+       return partCount;
 }
 
 void Commit::setDead() {
-       if (isDead) {
-               // Already dead
-               return;
-       }
-
-       // Set dead
-       isDead = true;
-
-       // Make all the parts of this transaction dead
-       for (int32_t partNumber : parts->keySet()) {
-               CommitPart *part = parts->get(partNumber);
-               part->setDead();
+       if (!isDead) {
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+                       CommitPart *part = parts->get(partNumber);
+                       if (parts != NULL)
+                               part->setDead();
+               }
        }
 }
 
@@ -130,17 +137,15 @@ CommitPart *Commit::getPart(int index) {
 
 void Commit::createCommitParts() {
        parts->clear();
-
+       partCount = 0;
        // Convert to chars
        Array<char> *charData = convertDataToBytes();
 
-
        int commitPartCount = 0;
        int currentPosition = 0;
        int remaining = charData->length();
 
        while (remaining > 0) {
-
                bool isLastPart = false;
                // determine how much to copy
                int copySize = CommitPart_MAX_NON_HEADER_SIZE;
@@ -153,8 +158,8 @@ void Commit::createCommitParts() {
                Array<char> *partData = new Array<char>(copySize);
                System_arraycopy(charData, currentPosition, partData, 0, copySize);
 
-               CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
-               parts->put(part->getPartNumber(), part);
+               CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+               parts->setExpand(part->getPartNumber(), part);
 
                // Update position, count and remaining
                currentPosition += copySize;
@@ -164,22 +169,24 @@ void Commit::createCommitParts() {
 }
 
 void Commit::decodeCommitData() {
-
        // Calculate the size of the data section
        int dataSize = 0;
-       for (int i = 0; i < parts->keySet()->size(); i++) {
+       for (uint i = 0; i < parts->size(); i++) {
                CommitPart *tp = parts->get(i);
-               dataSize += tp->getDataSize();
+               if (tp != NULL)
+                       dataSize += tp->getDataSize();
        }
 
        Array<char> *combinedData = new Array<char>(dataSize);
        int currentPosition = 0;
 
        // Stitch all the data sections together
-       for (int i = 0; i < parts->keySet()->size(); i++) {
+       for (uint i = 0; i < parts->size(); i++) {
                CommitPart *tp = parts->get(i);
-               System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
-               currentPosition += tp->getDataSize();
+               if (tp != NULL) {
+                       System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+                       currentPosition += tp->getDataSize();
+               }
        }
 
        // Decoder Object
@@ -196,13 +203,15 @@ void Commit::decodeCommitData() {
        }
 }
 
-Array<char> *convertDataToBytes() {
-
+Array<char> *Commit::convertDataToBytes() {
        // Calculate the size of the data
        int sizeOfData = sizeof(int32_t);       // Number of Update KV's
-       for (KeyValue *kv : keyValueUpdateSet) {
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
                sizeOfData += kv->getSize();
        }
+       delete kvit;
 
        // Data handlers and storage
        Array<char> *dataArray = new Array<char>(sizeOfData);
@@ -212,47 +221,56 @@ Array<char> *convertDataToBytes() {
        bbEncode->putInt(keyValueUpdateSet->size());
 
        // Encode all the updates
-       for (KeyValue *kv : keyValueUpdateSet) {
+       kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
                kv->encode(bbEncode);
        }
+       delete kvit;
 
        return bbEncode->array();
 }
 
-void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
        keyValueUpdateSet->clear();
        liveKeys->clear();
-
-       keyValueUpdateSet->addAll(newKVs->values());
-       liveKeys->addAll(newKVs->keySet());
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               liveKeys->add(kv->getKey());
+               keyValueUpdateSet->add(kv);
+       }
+       delete kvit;
 }
 
 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
-
        if (older == NULL) {
                return newer;
        } else if (newer == NULL) {
                return older;
        }
-
-       Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
-       for (KeyValue *kv : older->getKeyValueUpdateSet()) {
-               kvSet->put(kv->getKey(), kv);
+       Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kvSet->add(kv);
        }
-
-       for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
-               kvSet->put(kv->getKey(), kv);
+       delete kvit;
+       kvit = newer->getKeyValueUpdateSet()->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kvSet->add(kv);
        }
+       delete kvit;
 
        int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
-
        if (transactionSequenceNumber == -1) {
                transactionSequenceNumber = older->getTransactionSequenceNumber();
        }
 
        Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
-
        newCommit->setKVsMap(kvSet);
 
+       delete kvSet;
        return newCommit;
 }