-#include "commit.h"
+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "IoTString.h"
Commit::Commit() :
- parts(new Hashtable<int32_t, CommitPart *>()),
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
missingParts(NULL),
fldisComplete(false),
hasLastPart(false),
- keyValueUpdateSet(new HashSet<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
isDead(false),
sequenceNumber(-1),
machineId(-1),
liveKeys(new Hashset<IoTString *>) {
}
-
Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
- parts(new Hashtable<int32_t, CommitPart *>()),
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
missingParts(NULL),
fldisComplete(true),
hasLastPart(false),
- keyValueUpdateSet(new HashSet<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
isDead(false),
sequenceNumber(_sequenceNumber),
machineId(_machineId),
liveKeys(new Hashset<IoTString *>) {
}
-void Commit::addPartDecode(CommitPart newPart) {
-
+void Commit::addPartDecode(CommitPart *newPart) {
if (isDead) {
// If dead then just kill this part and move on
newPart->setDead();
return;
}
- CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
+ CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if (previouslySeenPart == NULL)
+ partCount++;
- if (previoslySeenPart != NULL) {
+ if (previouslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart->setDead();
+ previouslySeenPart->setDead();
} else if (newPart->isLastPart()) {
- missingParts = new HashSet<int32_t>();
+ missingParts = new Hashset<int32_t>();
hasLastPart = true;
for (int i = 0; i < newPart->getPartNumber(); i++) {
return transactionSequenceNumber;
}
-Hashtable<int32_t, CommitPart *> *Commit::getParts() {
+Vector<CommitPart *> *Commit::getParts() {
return parts;
}
}
}
-Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
+Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
return keyValueUpdateSet;
}
int32_t Commit::getNumberOfParts() {
- return parts->size();
+ return partCount;
}
void Commit::setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts->keySet()) {
- CommitPart part = parts->get(partNumber);
- part->setDead();
+ if (!isDead) {
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
+ CommitPart *part = parts->get(partNumber);
+ if (parts != NULL)
+ part->setDead();
+ }
}
}
void Commit::createCommitParts() {
parts->clear();
-
+ partCount = 0;
// Convert to chars
Array<char> *charData = convertDataToBytes();
-
int commitPartCount = 0;
int currentPosition = 0;
int remaining = charData->length();
while (remaining > 0) {
-
bool isLastPart = false;
// determine how much to copy
int copySize = CommitPart_MAX_NON_HEADER_SIZE;
// Copy to a smaller version
Array<char> *partData = new Array<char>(copySize);
- System->arraycopy(charData, currentPosition, partData, 0, copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts->put(part->getPartNumber(), part);
+ CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->setExpand(part->getPartNumber(), part);
// Update position, count and remaining
currentPosition += copySize;
}
void Commit::decodeCommitData() {
-
// Calculate the size of the data section
int dataSize = 0;
- for (int i = 0; i < parts->keySet()->size(); i++) {
+ for (int i = 0; i < parts->size(); i++) {
CommitPart *tp = parts->get(i);
- dataSize += tp->getDataSize();
+ if (tp != NULL)
+ dataSize += tp->getDataSize();
}
Array<char> *combinedData = new Array<char>(dataSize);
int currentPosition = 0;
// Stitch all the data sections together
- for (int i = 0; i < parts->keySet()->size(); i++) {
+ for (int i = 0; i < parts->size(); i++) {
CommitPart *tp = parts->get(i);
- System->arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
- currentPosition += tp->getDataSize();
+ if (tp != NULL) {
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
}
// Decoder Object
// Decode all the updates key values
for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue *kv = (KeyValue *)KeyValue->decode(bbDecode);
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
keyValueUpdateSet->add(kv);
liveKeys->add(kv->getKey());
}
}
-Array<char> *convertDataToBytes() {
-
+Array<char> *Commit::convertDataToBytes() {
// Calculate the size of the data
int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue *kv : keyValueUpdateSet) {
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
sizeOfData += kv->getSize();
}
+ delete kvit;
// Data handlers and storage
Array<char> *dataArray = new Array<char>(sizeOfData);
bbEncode->putInt(keyValueUpdateSet->size());
// Encode all the updates
- for (KeyValue *kv : keyValueUpdateSet) {
+ kvit = keyValueUpdateSet->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
kv->encode(bbEncode);
}
+ delete kvit;
return bbEncode->array();
}
-void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
keyValueUpdateSet->clear();
+ keyValueUpdateSet->addAll(newKVs);
liveKeys->clear();
-
- keyValueUpdateSet->addAll(newKVs->values());
- liveKeys->addAll(newKVs->keySet());
-
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+ while (kvit->hasNext()) {
+ liveKeys->add(kvit->next()->getKey());
+ }
+ delete kvit;
}
-
Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
-
if (older == NULL) {
return newer;
} else if (newer == NULL) {
return older;
}
-
- Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
- for (KeyValue *kv : older->getKeyValueUpdateSet()) {
- kvSet->put(kv->getKey(), kv);
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
}
-
- for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
- kvSet->put(kv->getKey(), kv);
+ delete kvit;
+ kvit = newer->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ kvSet->add(kv);
}
+ delete kvit;
int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
-
if (transactionSequenceNumber == -1) {
transactionSequenceNumber = older->getTransactionSequenceNumber();
}
Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
-
newCommit->setKVsMap(kvSet);
+ delete kvSet;
return newCommit;
}