+++ /dev/null
-#include "Commit.h"
-#include "CommitPart.h"
-#include "ByteBuffer.h"
-#include "IoTString.h"
-
-Commit::Commit() :
- parts(new Vector<CommitPart *>()),
- partCount(0),
- missingParts(NULL),
- fldisComplete(false),
- hasLastPart(false),
- keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
- isDead(false),
- sequenceNumber(-1),
- machineId(-1),
- transactionSequenceNumber(-1),
- dataBytes(NULL),
- liveKeys(new Hashset<IoTString *>()) {
-}
-
-Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
- parts(new Vector<CommitPart *>()),
- partCount(0),
- missingParts(NULL),
- fldisComplete(true),
- hasLastPart(false),
- keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
- isDead(false),
- sequenceNumber(_sequenceNumber),
- machineId(_machineId),
- transactionSequenceNumber(_transactionSequenceNumber),
- dataBytes(NULL),
- liveKeys(new Hashset<IoTString *>()) {
-}
-
-Commit::~Commit() {
- {
- uint Size = parts->size();
- for(uint i=0;i<Size; i++)
- parts->get(i)->releaseRef();
- delete parts;
- }
- {
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
- while(keyit->hasNext()) {
- delete keyit->next();
- }
- delete keyit;
- delete keyValueUpdateSet;
- }
- delete liveKeys;
- if (missingParts != NULL)
- delete missingParts;
- if (dataBytes != NULL)
- delete dataBytes;
-}
-
-void Commit::addPartDecode(CommitPart *newPart) {
- if (isDead) {
- // If dead then just kill this part and move on
- newPart->setDead();
- return;
- }
-
- newPart->acquireRef();
- CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
- if (previouslySeenPart == NULL)
- partCount++;
-
- if (previouslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previouslySeenPart->setDead();
- previouslySeenPart->releaseRef();
- } else if (newPart->isLastPart()) {
- missingParts = new Hashset<int32_t>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart->getPartNumber(); i++) {
- if (parts->get(i) == NULL) {
- missingParts->add(i);
- }
- }
- }
-
- if (!fldisComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts->remove(newPart->getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts->size() == 0) {
-
- // We have all the parts
- fldisComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeCommitData();
-
- // Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts->get(0)->getSequenceNumber();
- machineId = parts->get(0)->getMachineId();
- transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
- }
- }
-}
-
-int64_t Commit::getSequenceNumber() {
- return sequenceNumber;
-}
-
-int64_t Commit::getTransactionSequenceNumber() {
- return transactionSequenceNumber;
-}
-
-Vector<CommitPart *> *Commit::getParts() {
- return parts;
-}
-
-void Commit::addKV(KeyValue *kv) {
- KeyValue * kvcopy = kv->getCopy();
- keyValueUpdateSet->add(kvcopy);
- liveKeys->add(kvcopy->getKey());
-}
-
-void Commit::invalidateKey(IoTString *key) {
- liveKeys->remove(key);
-
- if (liveKeys->size() == 0) {
- setDead();
- }
-}
-
-Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
- return keyValueUpdateSet;
-}
-
-int32_t Commit::getNumberOfParts() {
- return partCount;
-}
-
-void Commit::setDead() {
- if (!isDead) {
- isDead = true;
- // Make all the parts of this transaction dead
- for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
- CommitPart *part = parts->get(partNumber);
- part->setDead();
- }
- }
-}
-
-void Commit::createCommitParts() {
- uint Size = parts->size();
- for(uint i=0;i < Size; i++) {
- Entry * e=parts->get(i);
- e->releaseRef();
- }
- parts->clear();
- partCount = 0;
- // Convert to chars
- Array<char> *charData = convertDataToBytes();
-
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData->length();
-
- while (remaining > 0) {
- bool isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart_MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true;// last bit of data so last part
- }
-
- // Copy to a smaller version
- Array<char> *partData = new Array<char>(copySize);
- System_arraycopy(charData, currentPosition, partData, 0, copySize);
-
- CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts->setExpand(part->getPartNumber(), part);
-
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- delete charData;
-}
-
-void Commit::decodeCommitData() {
- // Calculate the size of the data section
- int dataSize = 0;
- for (uint i = 0; i < parts->size(); i++) {
- CommitPart *tp = parts->get(i);
- if (tp != NULL)
- dataSize += tp->getDataSize();
- }
-
- Array<char> *combinedData = new Array<char>(dataSize);
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (uint i = 0; i < parts->size(); i++) {
- CommitPart *tp = parts->get(i);
- if (tp != NULL) {
- System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
- currentPosition += tp->getDataSize();
- }
- }
-
- // Decoder Object
- ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode->getInt();
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
- keyValueUpdateSet->add(kv);
- liveKeys->add(kv->getKey());
- }
- delete bbDecode;
-}
-
-Array<char> *Commit::convertDataToBytes() {
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- sizeOfData += kv->getSize();
- }
- delete kvit;
-
- // Data handlers and storage
- Array<char> *dataArray = new Array<char>(sizeOfData);
- ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
-
- // Encode the size of the updates and guard sets
- bbEncode->putInt(keyValueUpdateSet->size());
-
- // Encode all the updates
- kvit = keyValueUpdateSet->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- kv->encode(bbEncode);
- }
- delete kvit;
- Array<char> * array = bbEncode->array();
- bbEncode->releaseArray();
- delete bbEncode;
- return array;
-}
-
-void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
- keyValueUpdateSet->clear();
- liveKeys->clear();
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- KeyValue *kvcopy = kv->getCopy();
- liveKeys->add(kvcopy->getKey());
- keyValueUpdateSet->add(kvcopy);
- }
- delete kvit;
-}
-
-Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
- Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- kvSet->add(kv);
- }
- delete kvit;
- kvit = newer->getKeyValueUpdateSet()->iterator();
- while (kvit->hasNext()) {
- KeyValue *kv = kvit->next();
- kvSet->add(kv);
- }
- delete kvit;
-
- int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older->getTransactionSequenceNumber();
- }
-
- Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
- newCommit->setKVsMap(kvSet);
-
- delete kvSet;
- return newCommit;
-}