edits
[iotcloud.git] / version2 / src / C / Commit.cc
index 5c616920726d5a8fbedd163ce5e9b517935ec405..f159f71608c59bf74f427234cab8af797fee8ddc 100644 (file)
@@ -1,11 +1,15 @@
-#include "commit.h"
+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
 
 Commit::Commit() :
-       parts(new Hashtable<int32_t, CommitPart *>()),
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
        missingParts(NULL),
        fldisComplete(false),
        hasLastPart(false),
-       keyValueUpdateSet(new HashSet<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
        isDead(false),
        sequenceNumber(-1),
        machineId(-1),
@@ -13,13 +17,13 @@ Commit::Commit() :
        liveKeys(new Hashset<IoTString *>) {
 }
 
-
 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
-       parts(new Hashtable<int32_t, CommitPart *>()),
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
        missingParts(NULL),
        fldisComplete(true),
        hasLastPart(false),
-       keyValueUpdateSet(new HashSet<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
        isDead(false),
        sequenceNumber(_sequenceNumber),
        machineId(_machineId),
@@ -27,234 +31,237 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction
        liveKeys(new Hashset<IoTString *>) {
 }
 
-void Commit::addPartDecode(CommitPart newPart) {
-
+void Commit::addPartDecode(CommitPart *newPart) {
        if (isDead) {
                // If dead then just kill this part and move on
-               newPart.setDead();
+               newPart->setDead();
                return;
        }
 
-       CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-       
-       if (previoslySeenPart != NULL) {
+       CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if (previouslySeenPart == NULL)
+               partCount++;
+
+       if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
-               previoslySeenPart.setDead();
-       } else if (newPart.isLastPart()) {
-               missingParts = new HashSet<Integer>();
+               previouslySeenPart->setDead();
+       } else if (newPart->isLastPart()) {
+               missingParts = new Hashset<int32_t>();
                hasLastPart = true;
-               
-               for (int i = 0; i < newPart.getPartNumber(); i++) {
-                       if (parts.get(i) == NULL) {
-                               missingParts.add(i);
+
+               for (int i = 0; i < newPart->getPartNumber(); i++) {
+                       if (parts->get(i) == NULL) {
+                               missingParts->add(i);
                        }
                }
        }
-       
+
        if (!fldisComplete && hasLastPart) {
-               
+
                // We have seen this part so remove it from the set of missing parts
-               missingParts.remove(newPart.getPartNumber());
-               
+               missingParts->remove(newPart->getPartNumber());
+
                // Check if all the parts have been seen
-               if (missingParts.size() == 0) {
-                       
+               if (missingParts->size() == 0) {
+
                        // We have all the parts
                        fldisComplete = true;
-                       
+
                        // Decode all the parts and create the key value guard and update sets
                        decodeCommitData();
-                       
+
                        // Get the sequence number and arbitrator of this transaction
-                       sequenceNumber = parts.get(0).getSequenceNumber();
-                       machineId = parts.get(0).getMachineId();
-                       transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+                       sequenceNumber = parts->get(0)->getSequenceNumber();
+                       machineId = parts->get(0)->getMachineId();
+                       transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
                }
        }
 }
 
-    int64_t getSequenceNumber() {
-        return sequenceNumber;
-    }
-
-    int64_t getTransactionSequenceNumber() {
-        return transactionSequenceNumber;
-    }
-
-    Hashtable<Integer, CommitPart> getParts() {
-        return parts;
-    }
-
-    void addKV(KeyValue kv) {
-        keyValueUpdateSet.add(kv);
-        liveKeys.add(kv.getKey());
-    }
-
-    void invalidateKey(IoTString key) {
-        liveKeys.remove(key);
-
-        if (liveKeys.size() == 0) {
-            setDead();
-        }
-    }
-
-    Set<KeyValue> getKeyValueUpdateSet() {
-        return keyValueUpdateSet;
-    }
-
-int32_t getNumberOfParts() {
-       return parts.size();
+int64_t Commit::getSequenceNumber() {
+       return sequenceNumber;
 }
 
-    void setDead() {
-        if (isDead) {
-            // Already dead
-            return;
-        }
-
-        // Set dead
-        isDead = true;
-
-        // Make all the parts of this transaction dead
-        for (Integer partNumber : parts.keySet()) {
-            CommitPart part = parts.get(partNumber);
-            part.setDead();
-        }
-    }
-
-    CommitPart getPart(int index) {
-        return parts.get(index);
-    }
-
-    void createCommitParts() {
-
-        parts.clear();
-
-        // Convert to chars
-        char[] charData = convertDataToBytes();
-
-
-        int commitPartCount = 0;
-        int currentPosition = 0;
-        int remaining = charData.length;
-
-        while (remaining > 0) {
-
-            Boolean isLastPart = false;
-            // determine how much to copy
-            int copySize = CommitPart.MAX_NON_HEADER_SIZE;
-            if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
-                copySize = remaining;
-                isLastPart = true; // last bit of data so last part
-            }
-
-            // Copy to a smaller version
-            char[] partData = new char[copySize];
-            System.arraycopy(charData, currentPosition, partData, 0, copySize);
+int64_t Commit::getTransactionSequenceNumber() {
+       return transactionSequenceNumber;
+}
 
-            CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
-            parts.put(part.getPartNumber(), part);
+Vector<CommitPart *> *Commit::getParts() {
+       return parts;
+}
 
-            // Update position, count and remaining
-            currentPosition += copySize;
-            commitPartCount++;
-            remaining -= copySize;
-        }
-    }
+void Commit::addKV(KeyValue *kv) {
+       keyValueUpdateSet->add(kv);
+       liveKeys->add(kv->getKey());
+}
 
-    void decodeCommitData() {
+void Commit::invalidateKey(IoTString *key) {
+       liveKeys->remove(key);
 
-        // Calculate the size of the data section
-        int dataSize = 0;
-        for (int i = 0; i < parts.keySet().size(); i++) {
-            CommitPart tp = parts.get(i);
-            dataSize += tp.getDataSize();
-        }
+       if (liveKeys->size() == 0) {
+               setDead();
+       }
+}
 
-        char[] combinedData = new char[dataSize];
-        int currentPosition = 0;
+Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
+       return keyValueUpdateSet;
+}
 
-        // Stitch all the data sections together
-        for (int i = 0; i < parts.keySet().size(); i++) {
-            CommitPart tp = parts.get(i);
-            System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
-            currentPosition += tp.getDataSize();
-        }
+int32_t Commit::getNumberOfParts() {
+       return partCount;
+}
 
-        // Decoder Object
-        ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+void Commit::setDead() {
+       if (!isDead) {
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+                       CommitPart *part = parts->get(partNumber);
+                       if (parts != NULL)
+                               part->setDead();
+               }
+       }
+}
 
-        // Decode how many key value pairs need to be decoded
-        int numberOfKVUpdates = bbDecode.getInt();
+CommitPart *Commit::getPart(int index) {
+       return parts->get(index);
+}
 
-        // Decode all the updates key values
-        for (int i = 0; i < numberOfKVUpdates; i++) {
-            KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
-            keyValueUpdateSet.add(kv);
-            liveKeys.add(kv.getKey());
-        }
-    }
+void Commit::createCommitParts() {
+       parts->clear();
+       partCount = 0;
+       // Convert to chars
+       Array<char> *charData = convertDataToBytes();
+
+       int commitPartCount = 0;
+       int currentPosition = 0;
+       int remaining = charData->length();
+
+       while (remaining > 0) {
+               bool isLastPart = false;
+               // determine how much to copy
+               int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+               if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+                       copySize = remaining;
+                       isLastPart = true;// last bit of data so last part
+               }
 
-    char[] convertDataToBytes() {
+               // Copy to a smaller version
+               Array<char> *partData = new Array<char>(copySize);
+               System_arraycopy(charData, currentPosition, partData, 0, copySize);
 
-        // Calculate the size of the data
-        int sizeOfData = sizeof(int32_t); // Number of Update KV's
-        for (KeyValue kv : keyValueUpdateSet) {
-            sizeOfData += kv.getSize();
-        }
+               CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+               parts->setExpand(part->getPartNumber(), part);
 
-        // Data handlers and storage
-        char[] dataArray = new char[sizeOfData];
-        ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+               // Update position, count and remaining
+               currentPosition += copySize;
+               commitPartCount++;
+               remaining -= copySize;
+       }
+}
 
-        // Encode the size of the updates and guard sets
-        bbEncode.putInt(keyValueUpdateSet.size());
+void Commit::decodeCommitData() {
+       // Calculate the size of the data section
+       int dataSize = 0;
+       for (int i = 0; i < parts->size(); i++) {
+               CommitPart *tp = parts->get(i);
+               if (tp != NULL)
+                       dataSize += tp->getDataSize();
+       }
 
-        // Encode all the updates
-        for (KeyValue kv : keyValueUpdateSet) {
-            kv.encode(bbEncode);
-        }
+       Array<char> *combinedData = new Array<char>(dataSize);
+       int currentPosition = 0;
 
-        return bbEncode.array();
-    }
+       // Stitch all the data sections together
+       for (int i = 0; i < parts->size(); i++) {
+               CommitPart *tp = parts->get(i);
+               if (tp != NULL) {
+                       System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+                       currentPosition += tp->getDataSize();
+               }
+       }
 
-    void setKVsMap(Hashtable<IoTString, KeyValue> newKVs) {
-        keyValueUpdateSet.clear();
-        liveKeys.clear();
+       // Decoder Object
+       ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
 
-        keyValueUpdateSet.addAll(newKVs.values());
-        liveKeys.addAll(newKVs.keySet());
+       // Decode how many key value pairs need to be decoded
+       int numberOfKVUpdates = bbDecode->getInt();
 
-    }
+       // Decode all the updates key values
+       for (int i = 0; i < numberOfKVUpdates; i++) {
+               KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueUpdateSet->add(kv);
+               liveKeys->add(kv->getKey());
+       }
+}
 
+Array<char> *Commit::convertDataToBytes() {
+       // Calculate the size of the data
+       int sizeOfData = sizeof(int32_t);       // Number of Update KV's
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               sizeOfData += kv->getSize();
+       }
+       delete kvit;
 
-    static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
+       // Data handlers and storage
+       Array<char> *dataArray = new Array<char>(sizeOfData);
+       ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
 
-        if (older == NULL) {
-            return newer;
-        } else if (newer == NULL) {
-            return older;
-        }
+       // Encode the size of the updates and guard sets
+       bbEncode->putInt(keyValueUpdateSet->size());
 
-        Hashtable<IoTString, KeyValue> kvSet = new Hashtable<IoTString, KeyValue>();
-        for (KeyValue kv : older.getKeyValueUpdateSet()) {
-            kvSet.put(kv.getKey(), kv);
-        }
+       // Encode all the updates
+       kvit = keyValueUpdateSet->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kv->encode(bbEncode);
+       }
+       delete kvit;
 
-        for (KeyValue kv : newer.getKeyValueUpdateSet()) {
-            kvSet.put(kv.getKey(), kv);
-        }
+       return bbEncode->array();
+}
 
-        int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
+       keyValueUpdateSet->clear();
+       keyValueUpdateSet->addAll(newKVs);
+       liveKeys->clear();
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+       while (kvit->hasNext()) {
+               liveKeys->add(kvit->next()->getKey());
+       }
+       delete kvit;
+}
 
-        if (transactionSequenceNumber == -1) {
-            transactionSequenceNumber = older.getTransactionSequenceNumber();
-        }
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
+       if (older == NULL) {
+               return newer;
+       } else if (newer == NULL) {
+               return older;
+       }
+       Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
+       SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kvSet->add(kv);
+       }
+       delete kvit;
+       kvit = newer->getKeyValueUpdateSet()->iterator();
+       while (kvit->hasNext()) {
+               KeyValue *kv = kvit->next();
+               kvSet->add(kv);
+       }
+       delete kvit;
 
-        Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+       int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
+       if (transactionSequenceNumber == -1) {
+               transactionSequenceNumber = older->getTransactionSequenceNumber();
+       }
 
-        newCommit.setKVsMap(kvSet);
+       Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
+       newCommit->setKVsMap(kvSet);
 
-        return newCommit;
-    }
+       delete kvSet;
+       return newCommit;
 }