X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FCommit.cc;h=f159f71608c59bf74f427234cab8af797fee8ddc;hp=5c616920726d5a8fbedd163ce5e9b517935ec405;hb=9c3fa5cbce287df14626d262bd0179e994338869;hpb=075451d75d86e6d0ea89e437f1e63759365eb8be diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index 5c61692..f159f71 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -1,11 +1,15 @@ -#include "commit.h" +#include "Commit.h" +#include "CommitPart.h" +#include "ByteBuffer.h" +#include "IoTString.h" Commit::Commit() : - parts(new Hashtable()), + parts(new Vector()), + partCount(0), missingParts(NULL), fldisComplete(false), hasLastPart(false), - keyValueUpdateSet(new HashSet()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(-1), machineId(-1), @@ -13,13 +17,13 @@ Commit::Commit() : liveKeys(new Hashset) { } - Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) : - parts(new Hashtable()), + parts(new Vector()), + partCount(0), missingParts(NULL), fldisComplete(true), hasLastPart(false), - keyValueUpdateSet(new HashSet()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(_sequenceNumber), machineId(_machineId), @@ -27,234 +31,237 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction liveKeys(new Hashset) { } -void Commit::addPartDecode(CommitPart newPart) { - +void Commit::addPartDecode(CommitPart *newPart) { if (isDead) { // If dead then just kill this part and move on - newPart.setDead(); + newPart->setDead(); return; } - CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - - if (previoslySeenPart != NULL) { + CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if (previouslySeenPart == NULL) + partCount++; + + if (previouslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); + previouslySeenPart->setDead(); + } else if (newPart->isLastPart()) { + missingParts = new Hashset(); hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); + + for (int i = 0; i < newPart->getPartNumber(); i++) { + if (parts->get(i) == NULL) { + missingParts->add(i); } } } - + if (!fldisComplete && hasLastPart) { - + // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - + missingParts->remove(newPart->getPartNumber()); + // Check if all the parts have been seen - if (missingParts.size() == 0) { - + if (missingParts->size() == 0) { + // We have all the parts fldisComplete = true; - + // Decode all the parts and create the key value guard and update sets decodeCommitData(); - + // Get the sequence number and arbitrator of this transaction - sequenceNumber = parts.get(0).getSequenceNumber(); - machineId = parts.get(0).getMachineId(); - transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber(); + sequenceNumber = parts->get(0)->getSequenceNumber(); + machineId = parts->get(0)->getMachineId(); + transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber(); } } } - int64_t getSequenceNumber() { - return sequenceNumber; - } - - int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } - - Hashtable getParts() { - return parts; - } - - void addKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - liveKeys.add(kv.getKey()); - } - - void invalidateKey(IoTString key) { - liveKeys.remove(key); - - if (liveKeys.size() == 0) { - setDead(); - } - } - - Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } - -int32_t getNumberOfParts() { - return parts.size(); +int64_t Commit::getSequenceNumber() { + return sequenceNumber; } - void setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (Integer partNumber : parts.keySet()) { - CommitPart part = parts.get(partNumber); - part.setDead(); - } - } - - CommitPart getPart(int index) { - return parts.get(index); - } - - void createCommitParts() { - - parts.clear(); - - // Convert to chars - char[] charData = convertDataToBytes(); - - - int commitPartCount = 0; - int currentPosition = 0; - int remaining = charData.length; - - while (remaining > 0) { - - Boolean isLastPart = false; - // determine how much to copy - int copySize = CommitPart.MAX_NON_HEADER_SIZE; - if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) { - copySize = remaining; - isLastPart = true; // last bit of data so last part - } - - // Copy to a smaller version - char[] partData = new char[copySize]; - System.arraycopy(charData, currentPosition, partData, 0, copySize); +int64_t Commit::getTransactionSequenceNumber() { + return transactionSequenceNumber; +} - CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); - parts.put(part.getPartNumber(), part); +Vector *Commit::getParts() { + return parts; +} - // Update position, count and remaining - currentPosition += copySize; - commitPartCount++; - remaining -= copySize; - } - } +void Commit::addKV(KeyValue *kv) { + keyValueUpdateSet->add(kv); + liveKeys->add(kv->getKey()); +} - void decodeCommitData() { +void Commit::invalidateKey(IoTString *key) { + liveKeys->remove(key); - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - CommitPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } + if (liveKeys->size() == 0) { + setDead(); + } +} - char[] combinedData = new char[dataSize]; - int currentPosition = 0; +Hashset *Commit::getKeyValueUpdateSet() { + return keyValueUpdateSet; +} - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - CommitPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } +int32_t Commit::getNumberOfParts() { + return partCount; +} - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); +void Commit::setDead() { + if (!isDead) { + isDead = true; + // Make all the parts of this transaction dead + for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) { + CommitPart *part = parts->get(partNumber); + if (parts != NULL) + part->setDead(); + } + } +} - // Decode how many key value pairs need to be decoded - int numberOfKVUpdates = bbDecode.getInt(); +CommitPart *Commit::getPart(int index) { + return parts->get(index); +} - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - liveKeys.add(kv.getKey()); - } - } +void Commit::createCommitParts() { + parts->clear(); + partCount = 0; + // Convert to chars + Array *charData = convertDataToBytes(); + + int commitPartCount = 0; + int currentPosition = 0; + int remaining = charData->length(); + + while (remaining > 0) { + bool isLastPart = false; + // determine how much to copy + int copySize = CommitPart_MAX_NON_HEADER_SIZE; + if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) { + copySize = remaining; + isLastPart = true;// last bit of data so last part + } - char[] convertDataToBytes() { + // Copy to a smaller version + Array *partData = new Array(copySize); + System_arraycopy(charData, currentPosition, partData, 0, copySize); - // Calculate the size of the data - int sizeOfData = sizeof(int32_t); // Number of Update KV's - for (KeyValue kv : keyValueUpdateSet) { - sizeOfData += kv.getSize(); - } + CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); + parts->setExpand(part->getPartNumber(), part); - // Data handlers and storage - char[] dataArray = new char[sizeOfData]; - ByteBuffer bbEncode = ByteBuffer.wrap(dataArray); + // Update position, count and remaining + currentPosition += copySize; + commitPartCount++; + remaining -= copySize; + } +} - // Encode the size of the updates and guard sets - bbEncode.putInt(keyValueUpdateSet.size()); +void Commit::decodeCommitData() { + // Calculate the size of the data section + int dataSize = 0; + for (int i = 0; i < parts->size(); i++) { + CommitPart *tp = parts->get(i); + if (tp != NULL) + dataSize += tp->getDataSize(); + } - // Encode all the updates - for (KeyValue kv : keyValueUpdateSet) { - kv.encode(bbEncode); - } + Array *combinedData = new Array(dataSize); + int currentPosition = 0; - return bbEncode.array(); - } + // Stitch all the data sections together + for (int i = 0; i < parts->size(); i++) { + CommitPart *tp = parts->get(i); + if (tp != NULL) { + System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); + } + } - void setKVsMap(Hashtable newKVs) { - keyValueUpdateSet.clear(); - liveKeys.clear(); + // Decoder Object + ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); - keyValueUpdateSet.addAll(newKVs.values()); - liveKeys.addAll(newKVs.keySet()); + // Decode how many key value pairs need to be decoded + int numberOfKVUpdates = bbDecode->getInt(); - } + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueUpdateSet->add(kv); + liveKeys->add(kv->getKey()); + } +} +Array *Commit::convertDataToBytes() { + // Calculate the size of the data + int sizeOfData = sizeof(int32_t); // Number of Update KV's + SetIterator *kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + sizeOfData += kv->getSize(); + } + delete kvit; - static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) { + // Data handlers and storage + Array *dataArray = new Array(sizeOfData); + ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); - if (older == NULL) { - return newer; - } else if (newer == NULL) { - return older; - } + // Encode the size of the updates and guard sets + bbEncode->putInt(keyValueUpdateSet->size()); - Hashtable kvSet = new Hashtable(); - for (KeyValue kv : older.getKeyValueUpdateSet()) { - kvSet.put(kv.getKey(), kv); - } + // Encode all the updates + kvit = keyValueUpdateSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + kv->encode(bbEncode); + } + delete kvit; - for (KeyValue kv : newer.getKeyValueUpdateSet()) { - kvSet.put(kv.getKey(), kv); - } + return bbEncode->array(); +} - int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber(); +void Commit::setKVsMap(Hashset *newKVs) { + keyValueUpdateSet->clear(); + keyValueUpdateSet->addAll(newKVs); + liveKeys->clear(); + SetIterator *kvit = newKVs->iterator(); + while (kvit->hasNext()) { + liveKeys->add(kvit->next()->getKey()); + } + delete kvit; +} - if (transactionSequenceNumber == -1) { - transactionSequenceNumber = older.getTransactionSequenceNumber(); - } +Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { + if (older == NULL) { + return newer; + } else if (newer == NULL) { + return older; + } + Hashset *kvSet = new Hashset(); + SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + kvSet->add(kv); + } + delete kvit; + kvit = newer->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + kvSet->add(kv); + } + delete kvit; - Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber); + int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber(); + if (transactionSequenceNumber == -1) { + transactionSequenceNumber = older->getTransactionSequenceNumber(); + } - newCommit.setKVsMap(kvSet); + Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber); + newCommit->setKVsMap(kvSet); - return newCommit; - } + delete kvSet; + return newCommit; }