+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
+
+Commit::Commit() :
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
+ missingParts(NULL),
+ fldisComplete(false),
+ hasLastPart(false),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
+ isDead(false),
+ sequenceNumber(-1),
+ machineId(-1),
+ transactionSequenceNumber(-1),
+ liveKeys(new Hashset<IoTString *>) {
+}
+Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
+ missingParts(NULL),
+ fldisComplete(true),
+ hasLastPart(false),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
+ isDead(false),
+ sequenceNumber(_sequenceNumber),
+ machineId(_machineId),
+ transactionSequenceNumber(_transactionSequenceNumber),
+ liveKeys(new Hashset<IoTString *>) {
+}
-class Commit {
-
- Map<Integer, CommitPart> parts = NULL;
- Set<Integer> missingParts = NULL;
- bool isComplete = false;
- bool hasLastPart = false;
- Set<KeyValue> keyValueUpdateSet = NULL;
- bool isDead = false;
- int64_t sequenceNumber = -1;
- int64_t machineId = -1;
- int64_t transactionSequenceNumber = -1;
-
- Set<IoTString> liveKeys = NULL;
-
- Commit() {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
- }
-
- Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
- parts = new HashMap<Integer, CommitPart>();
- keyValueUpdateSet = new HashSet<KeyValue>();
-
- liveKeys = new HashSet<IoTString>();
-
- sequenceNumber = _sequenceNumber;
- machineId = _machineId;
- transactionSequenceNumber = _transactionSequenceNumber;
- isComplete = true;
- }
-
-
- void addPartDecode(CommitPart newPart) {
-
- if (isDead) {
- // If dead then just kill this part and move on
- newPart.setDead();
- return;
- }
-
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<Integer>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
- }
- }
- }
-
- if (!isComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
- // We have all the parts
- isComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeCommitData();
-
- // Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
- }
- }
- }
-
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- Map<Integer, CommitPart> getParts() {
- return parts;
- }
-
- void addKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
-
- void invalidateKey(IoTString key) {
- liveKeys.remove(key);
-
- if (liveKeys.size() == 0) {
- setDead();
- }
- }
-
- Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
-
- int getNumberOfParts() {
- return parts.size();
- }
-
- int64_t getMachineId() {
- return machineId;
- }
-
- bool isComplete() {
- return isComplete;
- }
-
- bool isLive() {
- return !isDead;
- }
-
- void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (Integer partNumber : parts.keySet()) {
- CommitPart part = parts.get(partNumber);
- part.setDead();
- }
- }
-
- CommitPart getPart(int index) {
- return parts.get(index);
- }
-
- void createCommitParts() {
-
- parts.clear();
-
- // Convert to chars
- char[] charData = convertDataToBytes();
-
-
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData.length;
-
- while (remaining > 0) {
-
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart.MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
-
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
-
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts.put(part.getPartNumber(), part);
-
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- }
-
- void decodeCommitData() {
-
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
-
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
-
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode.getInt();
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
- }
-
- char[] convertDataToBytes() {
-
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue kv : keyValueUpdateSet) {
- sizeOfData += kv.getSize();
- }
-
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
-
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueUpdateSet.size());
+void Commit::addPartDecode(CommitPart *newPart) {
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart->setDead();
+ return;
+ }
+
+ CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (previouslySeenPart == NULL)
+ partCount++;
+
+ if (previouslySeenPart != NULL) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previouslySeenPart->setDead();
+ } else if (newPart->isLastPart()) {
+ missingParts = new Hashset<int32_t>();
+ hasLastPart = true;
+
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
+ }
+ }
+ }
+
+ if (!fldisComplete && hasLastPart) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts->remove(newPart->getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts->size() == 0) {
+
+ // We have all the parts
+ fldisComplete = true;
+
+ // Decode all the parts and create the key value guard and update sets
+ decodeCommitData();
+
+ // Get the sequence number and arbitrator of this transaction
+ sequenceNumber = parts->get(0)->getSequenceNumber();
+ machineId = parts->get(0)->getMachineId();
+ transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
+ }
+ }
+}
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
+int64_t Commit::getSequenceNumber() {
+ return sequenceNumber;
+}
- return bbEncode.array();
- }
+int64_t Commit::getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+}
- void setKVsMap(Map<IoTString, KeyValue> newKVs) {
- keyValueUpdateSet.clear();
- liveKeys.clear();
+Vector<CommitPart *> *Commit::getParts() {
+ return parts;
+}
- keyValueUpdateSet.addAll(newKVs.values());
- liveKeys.addAll(newKVs.keySet());
+void Commit::addKV(KeyValue *kv) {
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+}
- }
+void Commit::invalidateKey(IoTString *key) {
+ liveKeys->remove(key);
+ if (liveKeys->size() == 0) {
+ setDead();
+ }
+}
- static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
+Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+}
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
+int32_t Commit::getNumberOfParts() {
+ return partCount;
+}
- Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
- for (KeyValue kv : older.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+void Commit::setDead() {
+ if (!isDead) {
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+ CommitPart *part = parts->get(partNumber);
+ if (parts != NULL)
+ part->setDead();
+ }
+ }
+}
- for (KeyValue kv : newer.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+CommitPart *Commit::getPart(int index) {
+ return parts->get(index);
+}
- int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
+void Commit::createCommitParts() {
+ parts->clear();
+ partCount = 0;
+ // Convert to chars
+ Array<char> *charData = convertDataToBytes();
+
+ int commitPartCount = 0;
+ int currentPosition = 0;
+ int remaining = charData->length();
+
+ while (remaining > 0) {
+ bool isLastPart = false;
+ // determine how much to copy
+ int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true;// last bit of data so last part
+ }
+
+ // Copy to a smaller version
+ Array<char> *partData = new Array<char>(copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
+
+ CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->setExpand(part->getPartNumber(), part);
+
+ // Update position, count and remaining
+ currentPosition += copySize;
+ commitPartCount++;
+ remaining -= copySize;
+ }
+}
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older.getTransactionSequenceNumber();
- }
+void Commit::decodeCommitData() {
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ if (tp != NULL)
+ dataSize += tp->getDataSize();
+ }
+
+ Array<char> *combinedData = new Array<char>(dataSize);
+ int currentPosition = 0;
+
+ // Stitch all the data sections together
+ for (int i = 0; i < parts->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ if (tp != NULL) {
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
+ }
+
+ // Decoder Object
+ ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVUpdates = bbDecode->getInt();
+
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+ }
+}
- Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+Array<char> *Commit::convertDataToBytes() {
+ // Calculate the size of the data
+ int sizeOfData = sizeof(int32_t); // Number of Update KV's
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ sizeOfData += kv->getSize();
+ }
+ delete kvit;
+
+ // Data handlers and storage
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
+
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueUpdateSet->size());
+
+ // Encode all the updates
+ kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kv->encode(bbEncode);
+ }
+ delete kvit;
+
+ return bbEncode->array();
+}
- newCommit.setKVsMap(kvSet);
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
+ keyValueUpdateSet->clear();
+ keyValueUpdateSet->addAll(newKVs);
+ liveKeys->clear();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+ while (kvit->hasNext()) {
+ liveKeys->add(kvit->next()->getKey());
+ }
+ delete kvit;
+}
- return newCommit;
- }
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
+ if (older == NULL) {
+ return newer;
+ } else if (newer == NULL) {
+ return older;
+ }
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
+ }
+ delete kvit;
+ kvit = newer->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
+ }
+ delete kvit;
+
+ int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
+ if (transactionSequenceNumber == -1) {
+ transactionSequenceNumber = older->getTransactionSequenceNumber();
+ }
+
+ Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
+ newCommit->setKVsMap(kvSet);
+
+ delete kvSet;
+ return newCommit;
}