Start port
[iotcloud.git] / version2 / src / C / Commit.cc
diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc
new file mode 100644 (file)
index 0000000..4925d6f
--- /dev/null
@@ -0,0 +1,279 @@
+
+
+class Commit {
+
+    Map<Integer, CommitPart> parts = NULL;
+    Set<Integer> missingParts = NULL;
+    bool isComplete = false;
+    bool hasLastPart = false;
+    Set<KeyValue> keyValueUpdateSet = NULL;
+    bool isDead = false;
+    int64_t sequenceNumber = -1;
+    int64_t machineId = -1;
+    int64_t transactionSequenceNumber = -1;
+
+    Set<IoTString> liveKeys = NULL;
+
+    Commit() {
+        parts = new HashMap<Integer, CommitPart>();
+        keyValueUpdateSet = new HashSet<KeyValue>();
+
+        liveKeys = new HashSet<IoTString>();
+    }
+
+    Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
+        parts = new HashMap<Integer, CommitPart>();
+        keyValueUpdateSet = new HashSet<KeyValue>();
+
+        liveKeys = new HashSet<IoTString>();
+
+        sequenceNumber = _sequenceNumber;
+        machineId = _machineId;
+        transactionSequenceNumber = _transactionSequenceNumber;
+        isComplete = true;
+    }
+
+
+    void addPartDecode(CommitPart newPart) {
+
+        if (isDead) {
+            // If dead then just kill this part and move on
+            newPart.setDead();
+            return;
+        }
+
+        CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+        if (previoslySeenPart != NULL) {
+            // Set dead the old one since the new one is a rescued version of this part
+            previoslySeenPart.setDead();
+        } else if (newPart.isLastPart()) {
+            missingParts = new HashSet<Integer>();
+            hasLastPart = true;
+
+            for (int i = 0; i < newPart.getPartNumber(); i++) {
+                if (parts.get(i) == NULL) {
+                    missingParts.add(i);
+                }
+            }
+        }
+
+        if (!isComplete && hasLastPart) {
+
+            // We have seen this part so remove it from the set of missing parts
+            missingParts.remove(newPart.getPartNumber());
+
+            // Check if all the parts have been seen
+            if (missingParts.size() == 0) {
+
+                // We have all the parts
+                isComplete = true;
+
+                // Decode all the parts and create the key value guard and update sets
+                decodeCommitData();
+
+                // Get the sequence number and arbitrator of this transaction
+                sequenceNumber = parts.get(0).getSequenceNumber();
+                machineId = parts.get(0).getMachineId();
+                transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+            }
+        }
+    }
+
+    int64_t getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    int64_t getTransactionSequenceNumber() {
+        return transactionSequenceNumber;
+    }
+
+    Map<Integer, CommitPart> getParts() {
+        return parts;
+    }
+
+    void addKV(KeyValue kv) {
+        keyValueUpdateSet.add(kv);
+        liveKeys.add(kv.getKey());
+    }
+
+    void invalidateKey(IoTString key) {
+        liveKeys.remove(key);
+
+        if (liveKeys.size() == 0) {
+            setDead();
+        }
+    }
+
+    Set<KeyValue> getKeyValueUpdateSet() {
+        return keyValueUpdateSet;
+    }
+
+    int getNumberOfParts() {
+        return parts.size();
+    }
+
+    int64_t getMachineId() {
+        return machineId;
+    }
+
+    bool isComplete() {
+        return isComplete;
+    }
+
+    bool isLive() {
+        return !isDead;
+    }
+
+    void setDead() {
+        if (isDead) {
+            // Already dead
+            return;
+        }
+
+        // Set dead
+        isDead = true;
+
+        // Make all the parts of this transaction dead
+        for (Integer partNumber : parts.keySet()) {
+            CommitPart part = parts.get(partNumber);
+            part.setDead();
+        }
+    }
+
+    CommitPart getPart(int index) {
+        return parts.get(index);
+    }
+
+    void createCommitParts() {
+
+        parts.clear();
+
+        // Convert to chars
+        char[] charData = convertDataToBytes();
+
+
+        int commitPartCount = 0;
+        int currentPosition = 0;
+        int remaining = charData.length;
+
+        while (remaining > 0) {
+
+            Boolean isLastPart = false;
+            // determine how much to copy
+            int copySize = CommitPart.MAX_NON_HEADER_SIZE;
+            if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
+                copySize = remaining;
+                isLastPart = true; // last bit of data so last part
+            }
+
+            // Copy to a smaller version
+            char[] partData = new char[copySize];
+            System.arraycopy(charData, currentPosition, partData, 0, copySize);
+
+            CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+            parts.put(part.getPartNumber(), part);
+
+            // Update position, count and remaining
+            currentPosition += copySize;
+            commitPartCount++;
+            remaining -= copySize;
+        }
+    }
+
+    void decodeCommitData() {
+
+        // Calculate the size of the data section
+        int dataSize = 0;
+        for (int i = 0; i < parts.keySet().size(); i++) {
+            CommitPart tp = parts.get(i);
+            dataSize += tp.getDataSize();
+        }
+
+        char[] combinedData = new char[dataSize];
+        int currentPosition = 0;
+
+        // Stitch all the data sections together
+        for (int i = 0; i < parts.keySet().size(); i++) {
+            CommitPart tp = parts.get(i);
+            System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+            currentPosition += tp.getDataSize();
+        }
+
+        // Decoder Object
+        ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+
+        // Decode how many key value pairs need to be decoded
+        int numberOfKVUpdates = bbDecode.getInt();
+
+        // Decode all the updates key values
+        for (int i = 0; i < numberOfKVUpdates; i++) {
+            KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+            keyValueUpdateSet.add(kv);
+            liveKeys.add(kv.getKey());
+        }
+    }
+
+    char[] convertDataToBytes() {
+
+        // Calculate the size of the data
+        int sizeOfData = sizeof(int32_t); // Number of Update KV's
+        for (KeyValue kv : keyValueUpdateSet) {
+            sizeOfData += kv.getSize();
+        }
+
+        // Data handlers and storage
+        char[] dataArray = new char[sizeOfData];
+        ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+
+        // Encode the size of the updates and guard sets
+        bbEncode.putInt(keyValueUpdateSet.size());
+
+        // Encode all the updates
+        for (KeyValue kv : keyValueUpdateSet) {
+            kv.encode(bbEncode);
+        }
+
+        return bbEncode.array();
+    }
+
+    void setKVsMap(Map<IoTString, KeyValue> newKVs) {
+        keyValueUpdateSet.clear();
+        liveKeys.clear();
+
+        keyValueUpdateSet.addAll(newKVs.values());
+        liveKeys.addAll(newKVs.keySet());
+
+    }
+
+
+    static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
+
+        if (older == NULL) {
+            return newer;
+        } else if (newer == NULL) {
+            return older;
+        }
+
+        Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
+        for (KeyValue kv : older.getKeyValueUpdateSet()) {
+            kvSet.put(kv.getKey(), kv);
+        }
+
+        for (KeyValue kv : newer.getKeyValueUpdateSet()) {
+            kvSet.put(kv.getKey(), kv);
+        }
+
+        int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
+
+        if (transactionSequenceNumber == -1) {
+            transactionSequenceNumber = older.getTransactionSequenceNumber();
+        }
+
+        Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+
+        newCommit.setKVsMap(kvSet);
+
+        return newCommit;
+    }
+}