edits
[iotcloud.git] / version2 / src / C / Commit.h
index c2354524ecfc80f1521d197da96d37c0b9ae187b..4dd5a9d810e06dbf380d12928e1b312ba7da2247 100644 (file)
-
+#ifndef COMMIT_H
+#define COMMIT_H
+#include "common.h"
 
 class Commit {
-
-    private Map<Integer, CommitPart> parts = NULL;
-    private Set<Integer> missingParts = NULL;
-    private bool isComplete = false;
-    private bool hasLastPart = false;
-    private Set<KeyValue> keyValueUpdateSet = NULL;
-    private bool isDead = false;
-    private int64_t sequenceNumber = -1;
-    private int64_t machineId = -1;
-    private int64_t transactionSequenceNumber = -1;
-
-    private Set<IoTString> liveKeys = NULL;
-
-    public Commit() {
-        parts = new HashMap<Integer, CommitPart>();
-        keyValueUpdateSet = new HashSet<KeyValue>();
-
-        liveKeys = new HashSet<IoTString>();
-    }
-
-    public Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
-        parts = new HashMap<Integer, CommitPart>();
-        keyValueUpdateSet = new HashSet<KeyValue>();
-
-        liveKeys = new HashSet<IoTString>();
-
-        sequenceNumber = _sequenceNumber;
-        machineId = _machineId;
-        transactionSequenceNumber = _transactionSequenceNumber;
-        isComplete = true;
-    }
-
-
-    public void addPartDecode(CommitPart newPart) {
-
-        if (isDead) {
-            // If dead then just kill this part and move on
-            newPart.setDead();
-            return;
-        }
-
-        CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
-        if (previoslySeenPart != NULL) {
-            // Set dead the old one since the new one is a rescued version of this part
-            previoslySeenPart.setDead();
-        } else if (newPart.isLastPart()) {
-            missingParts = new HashSet<Integer>();
-            hasLastPart = true;
-
-            for (int i = 0; i < newPart.getPartNumber(); i++) {
-                if (parts.get(i) == NULL) {
-                    missingParts.add(i);
-                }
-            }
-        }
-
-        if (!isComplete && hasLastPart) {
-
-            // We have seen this part so remove it from the set of missing parts
-            missingParts.remove(newPart.getPartNumber());
-
-            // Check if all the parts have been seen
-            if (missingParts.size() == 0) {
-
-                // We have all the parts
-                isComplete = true;
-
-                // Decode all the parts and create the key value guard and update sets
-                decodeCommitData();
-
-                // Get the sequence number and arbitrator of this transaction
-                sequenceNumber = parts.get(0).getSequenceNumber();
-                machineId = parts.get(0).getMachineId();
-                transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
-            }
-        }
-    }
-
-    public int64_t getSequenceNumber() {
-        return sequenceNumber;
-    }
-
-    public int64_t getTransactionSequenceNumber() {
-        return transactionSequenceNumber;
-    }
-
-    public Map<Integer, CommitPart> getParts() {
-        return parts;
-    }
-
-    public void addKV(KeyValue kv) {
-        keyValueUpdateSet.add(kv);
-        liveKeys.add(kv.getKey());
-    }
-
-    public void invalidateKey(IoTString key) {
-        liveKeys.remove(key);
-
-        if (liveKeys.size() == 0) {
-            setDead();
-        }
-    }
-
-    public Set<KeyValue> getKeyValueUpdateSet() {
-        return keyValueUpdateSet;
-    }
-
-    public int getNumberOfParts() {
-        return parts.size();
-    }
-
-    public int64_t getMachineId() {
-        return machineId;
-    }
-
-    public bool isComplete() {
-        return isComplete;
-    }
-
-    public bool isLive() {
-        return !isDead;
-    }
-
-    public void setDead() {
-        if (isDead) {
-            // Already dead
-            return;
-        }
-
-        // Set dead
-        isDead = true;
-
-        // Make all the parts of this transaction dead
-        for (Integer partNumber : parts.keySet()) {
-            CommitPart part = parts.get(partNumber);
-            part.setDead();
-        }
-    }
-
-    public CommitPart getPart(int index) {
-        return parts.get(index);
-    }
-
-    public void createCommitParts() {
-
-        parts.clear();
-
-        // Convert to chars
-        char[] charData = convertDataToBytes();
-
-
-        int commitPartCount = 0;
-        int currentPosition = 0;
-        int remaining = charData.length;
-
-        while (remaining > 0) {
-
-            Boolean isLastPart = false;
-            // determine how much to copy
-            int copySize = CommitPart.MAX_NON_HEADER_SIZE;
-            if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
-                copySize = remaining;
-                isLastPart = true; // last bit of data so last part
-            }
-
-            // Copy to a smaller version
-            char[] partData = new char[copySize];
-            System.arraycopy(charData, currentPosition, partData, 0, copySize);
-
-            CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
-            parts.put(part.getPartNumber(), part);
-
-            // Update position, count and remaining
-            currentPosition += copySize;
-            commitPartCount++;
-            remaining -= copySize;
-        }
-    }
-
-    private void decodeCommitData() {
-
-        // Calculate the size of the data section
-        int dataSize = 0;
-        for (int i = 0; i < parts.keySet().size(); i++) {
-            CommitPart tp = parts.get(i);
-            dataSize += tp.getDataSize();
-        }
-
-        char[] combinedData = new char[dataSize];
-        int currentPosition = 0;
-
-        // Stitch all the data sections together
-        for (int i = 0; i < parts.keySet().size(); i++) {
-            CommitPart tp = parts.get(i);
-            System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
-            currentPosition += tp.getDataSize();
-        }
-
-        // Decoder Object
-        ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
-
-        // Decode how many key value pairs need to be decoded
-        int numberOfKVUpdates = bbDecode.getInt();
-
-        // Decode all the updates key values
-        for (int i = 0; i < numberOfKVUpdates; i++) {
-            KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
-            keyValueUpdateSet.add(kv);
-            liveKeys.add(kv.getKey());
-        }
-    }
-
-    private char[] convertDataToBytes() {
-
-        // Calculate the size of the data
-        int sizeOfData = sizeof(int32_t); // Number of Update KV's
-        for (KeyValue kv : keyValueUpdateSet) {
-            sizeOfData += kv.getSize();
-        }
-
-        // Data handlers and storage
-        char[] dataArray = new char[sizeOfData];
-        ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
-
-        // Encode the size of the updates and guard sets
-        bbEncode.putInt(keyValueUpdateSet.size());
-
-        // Encode all the updates
-        for (KeyValue kv : keyValueUpdateSet) {
-            kv.encode(bbEncode);
-        }
-
-        return bbEncode.array();
-    }
-
-    private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
-        keyValueUpdateSet.clear();
-        liveKeys.clear();
-
-        keyValueUpdateSet.addAll(newKVs.values());
-        liveKeys.addAll(newKVs.keySet());
-
-    }
-
-
-    public static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
-
-        if (older == NULL) {
-            return newer;
-        } else if (newer == NULL) {
-            return older;
-        }
-
-        Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
-        for (KeyValue kv : older.getKeyValueUpdateSet()) {
-            kvSet.put(kv.getKey(), kv);
-        }
-
-        for (KeyValue kv : newer.getKeyValueUpdateSet()) {
-            kvSet.put(kv.getKey(), kv);
-        }
-
-        int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
-
-        if (transactionSequenceNumber == -1) {
-            transactionSequenceNumber = older.getTransactionSequenceNumber();
-        }
-
-        Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
-
-        newCommit.setKVsMap(kvSet);
-
-        return newCommit;
-    }
-}
+ private:
+       Hashtable<int32_t, CommitPart *> * parts;
+       Hashset<int32_t> *missingParts;
+  bool fldisComplete;
+       bool hasLastPart;
+  Hashset<KeyValue *> *keyValueUpdateSet;
+  bool isDead;
+  int64_t sequenceNumber;
+       int64_t machineId;
+  int64_t transactionSequenceNumber;
+  Hashset<IoTString*> * liveKeys;
+       Array<char> * convertDataToBytes();
+       void setKVsMap(Hashtable<IoTString *, KeyValue *> * newKVs);
+       
+ public:
+       Commit();
+       Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber);
+       
+       void addPartDecode(CommitPart * newPart);
+       int64_t getSequenceNumber();
+       int64_t getTransactionSequenceNumber();
+       Hashtable<int32_t, CommitPart *> *getParts();
+       void addKV(KeyValue * kv);
+       void invalidateKey(IoTString * key);
+       Hashset<KeyValue *> * getKeyValueUpdateSet();
+       int32_t getNumberOfParts();
+       int64_t getMachineId() { return machineId; }
+       bool isComplete() { return fldisComplete; }
+       bool isLive() { return !isDead; }
+       void setDead();
+       CommitPart * getPart(int32_t index);
+       void createCommitParts();
+       void decodeCommitData();
+};
+
+Commit * Commit_merge(Commit * newer, Commit * older, int64_t newSequenceNumber);
+#endif