-
- private Map<Integer, CommitPart> parts = NULL;
- private Set<Integer> missingParts = NULL;
- private bool isComplete = false;
- private bool hasLastPart = false;
- private Set<KeyValue> keyValueUpdateSet = NULL;
- private bool isDead = false;
- private int64_t sequenceNumber = -1;
- private int64_t machineId = -1;
- private int64_t transactionSequenceNumber = -1;
-
- private Set<IoTString> liveKeys = NULL;
-
- public Commit() {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
- }
-
- public Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
-
- sequenceNumber = _sequenceNumber;
- machineId = _machineId;
- transactionSequenceNumber = _transactionSequenceNumber;
- isComplete = true;
- }
-
-
- public void addPartDecode(CommitPart newPart) {
-
- if (isDead) {
- // If dead then just kill this part and move on
- newPart.setDead();
- return;
- }
-
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<Integer>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
- }
- }
- }
-
- if (!isComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
- // We have all the parts
- isComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeCommitData();
-
- // Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
- }
- }
- }
-
- public int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- public int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- public Map<Integer, CommitPart> getParts() {
- return parts;
- }
-
- public void addKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
-
- public void invalidateKey(IoTString key) {
- liveKeys.remove(key);
-
- if (liveKeys.size() == 0) {
- setDead();
- }
- }
-
- public Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
-
- public int getNumberOfParts() {
- return parts.size();
- }
-
- public int64_t getMachineId() {
- return machineId;
- }
-
- public bool isComplete() {
- return isComplete;
- }
-
- public bool isLive() {
- return !isDead;
- }
-
- public void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (Integer partNumber : parts.keySet()) {
- CommitPart part = parts.get(partNumber);
- part.setDead();
- }
- }
-
- public CommitPart getPart(int index) {
- return parts.get(index);
- }
-
- public void createCommitParts() {
-
- parts.clear();
-
- // Convert to chars
- char[] charData = convertDataToBytes();
-
-
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData.length;
-
- while (remaining > 0) {
-
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart.MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
-
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
-
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts.put(part.getPartNumber(), part);
-
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- }
-
- private void decodeCommitData() {
-
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
-
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
-
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode.getInt();
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
- }
-
- private char[] convertDataToBytes() {
-
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue kv : keyValueUpdateSet) {
- sizeOfData += kv.getSize();
- }
-
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
-
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueUpdateSet.size());
-
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
-
- return bbEncode.array();
- }
-
- private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
- keyValueUpdateSet.clear();
- liveKeys.clear();
-
- keyValueUpdateSet.addAll(newKVs.values());
- liveKeys.addAll(newKVs.keySet());
-
- }
-
-
- public static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
-
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
-
- Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
- for (KeyValue kv : older.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
-
- for (KeyValue kv : newer.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
-
- int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
-
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older.getTransactionSequenceNumber();
- }
-
- Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
-
- newCommit.setKVsMap(kvSet);
-
- return newCommit;
- }
-}
+ private:
+ Hashtable<int32_t, CommitPart *> * parts;
+ Hashset<int32_t> *missingParts;
+ bool fldisComplete;
+ bool hasLastPart;
+ Hashset<KeyValue *> *keyValueUpdateSet;
+ bool isDead;
+ int64_t sequenceNumber;
+ int64_t machineId;
+ int64_t transactionSequenceNumber;
+ Hashset<IoTString*> * liveKeys;
+ Array<char> * convertDataToBytes();
+ void setKVsMap(Hashtable<IoTString *, KeyValue *> * newKVs);
+
+ public:
+ Commit();
+ Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber);
+
+ void addPartDecode(CommitPart * newPart);
+ int64_t getSequenceNumber();
+ int64_t getTransactionSequenceNumber();
+ Hashtable<int32_t, CommitPart *> *getParts();
+ void addKV(KeyValue * kv);
+ void invalidateKey(IoTString * key);
+ Hashset<KeyValue *> * getKeyValueUpdateSet();
+ int32_t getNumberOfParts();
+ int64_t getMachineId() { return machineId; }
+ bool isComplete() { return fldisComplete; }
+ bool isLive() { return !isDead; }
+ void setDead();
+ CommitPart * getPart(int32_t index);
+ void createCommitParts();
+ void decodeCommitData();
+};
+
+Commit * Commit_merge(Commit * newer, Commit * older, int64_t newSequenceNumber);
+#endif