-#include "commit.h"
+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
Commit::Commit() :
- parts(new Hashtable<int32_t, CommitPart *>()),
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
missingParts(NULL),
fldisComplete(false),
hasLastPart(false),
- keyValueUpdateSet(new HashSet<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
isDead(false),
sequenceNumber(-1),
machineId(-1),
liveKeys(new Hashset<IoTString *>) {
}
-
Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
- parts(new Hashtable<int32_t, CommitPart *>()),
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
missingParts(NULL),
fldisComplete(true),
hasLastPart(false),
- keyValueUpdateSet(new HashSet<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
isDead(false),
sequenceNumber(_sequenceNumber),
machineId(_machineId),
liveKeys(new Hashset<IoTString *>) {
}
-void Commit::addPartDecode(CommitPart newPart) {
-
+void Commit::addPartDecode(CommitPart *newPart) {
if (isDead) {
// If dead then just kill this part and move on
- newPart.setDead();
+ newPart->setDead();
return;
}
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
+ CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (previouslySeenPart == NULL)
+ partCount++;
+
+ if (previouslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<int32_t>();
+ previouslySeenPart->setDead();
+ } else if (newPart->isLastPart()) {
+ missingParts = new Hashset<int32_t>();
hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
+
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
}
}
}
-
+
if (!fldisComplete && hasLastPart) {
-
+
// We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
+ missingParts->remove(newPart->getPartNumber());
+
// Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
+ if (missingParts->size() == 0) {
+
// We have all the parts
fldisComplete = true;
-
+
// Decode all the parts and create the key value guard and update sets
decodeCommitData();
-
+
// Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+ sequenceNumber = parts->get(0)->getSequenceNumber();
+ machineId = parts->get(0)->getMachineId();
+ transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
}
}
}
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- Hashtable<int32_t, CommitPart> getParts() {
- return parts;
- }
-
- void addKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
-
- void invalidateKey(IoTString key) {
- liveKeys.remove(key);
-
- if (liveKeys.size() == 0) {
- setDead();
- }
- }
-
- Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
-
-int32_t getNumberOfParts() {
- return parts.size();
+int64_t Commit::getSequenceNumber() {
+ return sequenceNumber;
}
- void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts.keySet()) {
- CommitPart part = parts.get(partNumber);
- part.setDead();
- }
- }
-
- CommitPart getPart(int index) {
- return parts.get(index);
- }
-
- void createCommitParts() {
-
- parts.clear();
-
- // Convert to chars
- char[] charData = convertDataToBytes();
-
-
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData.length;
-
- while (remaining > 0) {
-
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart.MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
-
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
+int64_t Commit::getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+}
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts.put(part.getPartNumber(), part);
+Vector<CommitPart *> *Commit::getParts() {
+ return parts;
+}
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- }
+void Commit::addKV(KeyValue *kv) {
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+}
- void decodeCommitData() {
+void Commit::invalidateKey(IoTString *key) {
+ liveKeys->remove(key);
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
+ if (liveKeys->size() == 0) {
+ setDead();
+ }
+}
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
+Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+}
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
+int32_t Commit::getNumberOfParts() {
+ return partCount;
+}
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+void Commit::setDead() {
+ if (!isDead) {
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+ CommitPart *part = parts->get(partNumber);
+ if (parts != NULL)
+ part->setDead();
+ }
+ }
+}
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode.getInt();
+CommitPart *Commit::getPart(int index) {
+ return parts->get(index);
+}
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
- }
+void Commit::createCommitParts() {
+ parts->clear();
+ partCount = 0;
+ // Convert to chars
+ Array<char> *charData = convertDataToBytes();
+
+ int commitPartCount = 0;
+ int currentPosition = 0;
+ int remaining = charData->length();
+
+ while (remaining > 0) {
+ bool isLastPart = false;
+ // determine how much to copy
+ int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true;// last bit of data so last part
+ }
- char[] convertDataToBytes() {
+ // Copy to a smaller version
+ Array<char> *partData = new Array<char>(copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue kv : keyValueUpdateSet) {
- sizeOfData += kv.getSize();
- }
+ CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->setExpand(part->getPartNumber(), part);
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+ // Update position, count and remaining
+ currentPosition += copySize;
+ commitPartCount++;
+ remaining -= copySize;
+ }
+}
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueUpdateSet.size());
+void Commit::decodeCommitData() {
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ if (tp != NULL)
+ dataSize += tp->getDataSize();
+ }
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
+ Array<char> *combinedData = new Array<char>(dataSize);
+ int currentPosition = 0;
- return bbEncode.array();
- }
+ // Stitch all the data sections together
+ for (int i = 0; i < parts->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ if (tp != NULL) {
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
+ }
- void setKVsMap(Hashtable<IoTString, KeyValue> newKVs) {
- keyValueUpdateSet.clear();
- liveKeys.clear();
+ // Decoder Object
+ ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
- keyValueUpdateSet.addAll(newKVs.values());
- liveKeys.addAll(newKVs.keySet());
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVUpdates = bbDecode->getInt();
- }
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+ }
+}
+Array<char> *Commit::convertDataToBytes() {
+ // Calculate the size of the data
+ int sizeOfData = sizeof(int32_t); // Number of Update KV's
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ sizeOfData += kv->getSize();
+ }
+ delete kvit;
- static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
+ // Data handlers and storage
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueUpdateSet->size());
- Hashtable<IoTString, KeyValue> kvSet = new Hashtable<IoTString, KeyValue>();
- for (KeyValue kv : older.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+ // Encode all the updates
+ kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kv->encode(bbEncode);
+ }
+ delete kvit;
- for (KeyValue kv : newer.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+ return bbEncode->array();
+}
- int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
+ keyValueUpdateSet->clear();
+ keyValueUpdateSet->addAll(newKVs);
+ liveKeys->clear();
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+ while (kvit->hasNext()) {
+ liveKeys->add(kvit->next()->getKey());
+ }
+ delete kvit;
+}
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older.getTransactionSequenceNumber();
- }
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
+ if (older == NULL) {
+ return newer;
+ } else if (newer == NULL) {
+ return older;
+ }
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
+ }
+ delete kvit;
+ kvit = newer->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
+ }
+ delete kvit;
- Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+ int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
+ if (transactionSequenceNumber == -1) {
+ transactionSequenceNumber = older->getTransactionSequenceNumber();
+ }
- newCommit.setKVsMap(kvSet);
+ Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
+ newCommit->setKVsMap(kvSet);
- return newCommit;
- }
+ delete kvSet;
+ return newCommit;
}