From fc3e0b967949a83c40622b339a115900c07fd253 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Fri, 19 Jan 2018 16:15:27 -0800 Subject: [PATCH] edits --- version2/src/C/CommitPart.cc | 12 +-- version2/src/C/CommitPart.h | 1 + version2/src/C/Transaction.cc | 122 +++++++++++++++--------------- version2/src/C/TransactionPart.cc | 23 +++--- version2/src/C/TransactionPart.h | 5 +- 5 files changed, 83 insertions(+), 80 deletions(-) diff --git a/version2/src/C/CommitPart.cc b/version2/src/C/CommitPart.cc index 08cd46b..d137a75 100644 --- a/version2/src/C/CommitPart.cc +++ b/version2/src/C/CommitPart.cc @@ -1,5 +1,5 @@ #include "CommitPart.h" - +#include "ByteBuffer.h" CommitPart::CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, Array *_data, bool _isLastPart) : Entry(s), @@ -9,7 +9,7 @@ CommitPart::CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int partNumber(_partNumber), fldisLastPart(_isLastPart), data(_data), - partId(new Pair(sequenceNumber, partNumber)), + partId(new Pair(sequenceNumber, partNumber)), commitId(new Pair(machineId, sequenceNumber)) { } @@ -20,7 +20,7 @@ int CommitPart::getSize() { return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length(); } -void CommitPart::setSlot(Slot s) { +void CommitPart::setSlot(Slot* s) { parentslot = s; } @@ -36,7 +36,7 @@ Array *CommitPart::getData() { return data; } -Pair *CommitPart::getPartId() { +Pair *CommitPart::getPartId() { return partId; } @@ -75,13 +75,13 @@ Entry *CommitPart_decode(Slot *s, ByteBuffer *bb) { return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); } -void CommitPart::encode(ByteBuffer bb) { +void CommitPart::encode(ByteBuffer *bb) { bb->put(TypeCommitPart); bb->putLong(machineId); bb->putLong(sequenceNumber); bb->putLong(transactionSequenceNumber); bb->putInt(partNumber); - bb->putInt(data.length); + bb->putInt(data->length()); if (fldisLastPart) { bb->put((char)1); diff --git a/version2/src/C/CommitPart.h b/version2/src/C/CommitPart.h index 19e8b7a..e0a0d91 100644 --- a/version2/src/C/CommitPart.h +++ b/version2/src/C/CommitPart.h @@ -2,6 +2,7 @@ #define COMMITPART_H #include "common.h" #include "Entry.h" +#include "Pair.h" // Max size of the part excluding the fixed size header #define CommitPart_MAX_NON_HEADER_SIZE 512 diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index aad62e5..691a78f 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -1,13 +1,13 @@ #include "Transaction.h" Transaction::Transaction() : - parts(new Hashtable()), + parts(new Hashtable()), missingParts(NULL), partsPendingSend(new Vector()), fldisComplete(false), hasLastPart(false), - keyValueGuardSet(new HashSet()), - keyValueUpdateSet(new HashSet()), + keyValueGuardSet(new Hashset()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(-1), clientLocalSequenceNumber(-1), @@ -18,14 +18,14 @@ Transaction::Transaction() : } void Transaction::addPartEncode(TransactionPart *newPart) { - parts.put(newPart.getPartNumber(), newPart); - partsPendingSend.add(newPart.getPartNumber()); + parts->put(newPart->getPartNumber(), newPart); + partsPendingSend->add(newPart->getPartNumber()); - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); + sequenceNumber = newPart->getSequenceNumber(); + arbitratorId = newPart->getArbitratorId(); + transactionId = newPart->getTransactionId(); + clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); + machineId = newPart->getMachineId(); fldisComplete = true; } @@ -33,28 +33,28 @@ void Transaction::addPartEncode(TransactionPart *newPart) { void Transaction::addPartDecode(TransactionPart *newPart) { if (isDead) { // If dead then just kill this part and move on - newPart.setDead(); + newPart->setDead(); return; } - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); + sequenceNumber = newPart->getSequenceNumber(); + arbitratorId = newPart->getArbitratorId(); + transactionId = newPart->getTransactionId(); + clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); + machineId = newPart->getMachineId(); - TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); + TransactionPart previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); if (previoslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); + previoslySeenPart->setDead(); + } else if (newPart->isLastPart()) { + missingParts = new Hashset(); hasLastPart = true; - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); + for (int i = 0; i < newPart->getPartNumber(); i++) { + if (parts->get(i) == NULL) { + missingParts->add(i); } } } @@ -62,10 +62,10 @@ void Transaction::addPartDecode(TransactionPart *newPart) { if (!fldisComplete && hasLastPart) { // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); + missingParts->remove(newPart->getPartNumber()); // Check if all the parts have been seen - if (missingParts.size() == 0) { + if (missingParts->size() == 0) { // We have all the parts fldisComplete = true; @@ -77,11 +77,11 @@ void Transaction::addPartDecode(TransactionPart *newPart) { } void Transaction::addUpdateKV(KeyValue *kv) { - keyValueUpdateSet.add(kv); + keyValueUpdateSet->add(kv); } void Transaction::addGuardKV(KeyValue *kv) { - keyValueGuardSet.add(kv); + keyValueGuardSet->add(kv); } @@ -92,8 +92,8 @@ int64_t Transaction::getSequenceNumber() { void Transaction::setSequenceNumber(int64_t _sequenceNumber) { sequenceNumber = _sequenceNumber; - for (int32_t i : parts.keySet()) { - parts.get(i).setSequenceNumber(sequenceNumber); + for (int32_t i : parts->keySet()) { + parts->get(i)->setSequenceNumber(sequenceNumber); } } @@ -114,10 +114,10 @@ void Transaction::resetNextPartToSend() { } TransactionPart *Transaction::getNextPartToSend() { - if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { + if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) { return NULL; } - TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); + TransactionPart part = parts->get(partsPendingSend->get(nextPartToSend)); nextPartToSend++; return part; } @@ -147,15 +147,15 @@ TransactionStatus *Transaction::getTransactionStatus() { void Transaction::removeSentParts(Vector *sentParts) { nextPartToSend = 0; - if (partsPendingSend.removeAll(sentParts)) + if (partsPendingSend->removeAll(sentParts)) { flddidSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); + transactionStatus->setTransactionSequenceNumber(sequenceNumber); } } bool Transaction::didSendAllParts() { - return partsPendingSend.isEmpty(); + return partsPendingSend->isEmpty(); } Hashset *Transaction::getKeyValueUpdateSet() { @@ -163,7 +163,7 @@ Hashset *Transaction::getKeyValueUpdateSet() { } int Transaction::getNumberOfParts() { - return parts.size(); + return parts->size(); } int64_t Transaction::getMachineId() { @@ -192,52 +192,52 @@ void Transaction::setDead() { isDead = true; // Make all the parts of this transaction dead - for (int32_t partNumber : parts.keySet()) { - TransactionPart part = parts.get(partNumber); - part.setDead(); + for (int32_t partNumber : parts->keySet()) { + TransactionPart part = parts->get(partNumber); + part->setDead(); } } TransactionPart *Transaction::getPart(int index) { - return parts.get(index); + return parts->get(index); } void Transaction::decodeTransactionData() { // Calculate the size of the data section int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - dataSize += tp.getDataSize(); + for (int i = 0; i < parts->keySet()->size(); i++) { + TransactionPart tp = parts->get(i); + dataSize += tp->getDataSize(); } Array *combinedData = new char[dataSize]; int currentPosition = 0; // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); + for (int i = 0; i < parts->keySet()->size(); i++) { + TransactionPart tp = parts->get(i); + System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); } // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); + ByteBuffer bbDecode = ByteBuffer_wrap(combinedData); // Decode how many key value pairs need to be decoded - int numberOfKVGuards = bbDecode.getInt(); - int numberOfKVUpdates = bbDecode.getInt(); + int numberOfKVGuards = bbDecode->getInt(); + int numberOfKVUpdates = bbDecode->getInt(); // Decode all the guard key values for (int i = 0; i < numberOfKVGuards; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueGuardSet.add(kv); + KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueGuardSet->add(kv); } // Decode all the updates key values for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); + KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode); + keyValueUpdateSet->add(kv); } } @@ -245,32 +245,32 @@ bool Transaction::evaluateGuard(Hashtable *committedKey for (KeyValue *kvGuard : keyValueGuardSet) { // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = NULL; + KeyValue * kv = NULL; // If we have a speculation table then use it first if (pendingTransactionSpeculatedKeyValueTable != NULL) { - kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); + kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey()); } // If we have a speculation table then use it first if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { - kv = speculatedKeyValueTable.get(kvGuard.getKey()); + kv = speculatedKeyValueTable->get(kvGuard->getKey()); } if (kv == NULL) { // if it is not in the speculative table then check the committed table and use that // value as our latest assumption - kv = committedKeyValueTable.get(kvGuard.getKey()); + kv = committedKeyValueTable->get(kvGuard->getKey()); } - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { + if (kvGuard->getValue() != NULL) { + if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) { if (kv != NULL) { - System.out.println(kvGuard.getValue() + " " + kv.getValue()); + System.out.println(kvGuard->getValue() + " " + kv->getValue()); } else { - System.out.println(kvGuard.getValue() + " " + kv); + System.out.println(kvGuard->getValue() + " " + kv); } return false; diff --git a/version2/src/C/TransactionPart.cc b/version2/src/C/TransactionPart.cc index 2c94865..1b47b06 100644 --- a/version2/src/C/TransactionPart.cc +++ b/version2/src/C/TransactionPart.cc @@ -1,10 +1,11 @@ #include "TransactionPart.h" +#include "ByteBuffer.h" int TransactionPart::getSize() { if (data == NULL) { return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); } - return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; + return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length(); } void TransactionPart::setSlot(Slot *s) { @@ -19,7 +20,7 @@ int64_t TransactionPart::getArbitratorId() { return arbitratorId; } -Pair *TransactionPart::getPartId() { +Pair *TransactionPart::getPartId() { return partId; } @@ -28,7 +29,7 @@ int TransactionPart::getPartNumber() { } int TransactionPart::getDataSize() { - return data.length; + return data->length(); } Array *TransactionPart::getData() { @@ -36,7 +37,7 @@ Array *TransactionPart::getData() { } bool TransactionPart::isLastPart() { - return isLastPart; + return fldisLastPart; } int64_t TransactionPart::getMachineId() { @@ -67,22 +68,22 @@ Entry *TransactionPart_decode(Slot *s, ByteBuffer *bb) { Array *data = new Array(dataSize); bb->get(data); - TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); - returnTransactionPart.setSequenceNumber(sequenceNumber); + TransactionPart * returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); + returnTransactionPart->setSequenceNumber(sequenceNumber); return returnTransactionPart; } void TransactionPart::encode(ByteBuffer *bb) { - bb->put(Entry.TypeTransactionPart); + bb->put(TypeTransactionPart); bb->putLong(sequenceNumber); bb->putLong(machineId); bb->putLong(arbitratorId); bb->putLong(clientLocalSequenceNumber); bb->putInt(partNumber); - bb->putInt(data.length); + bb->putInt(data->length()); - if (isLastPart) { + if (fldisLastPart) { bb->put((char)1); } else { bb->put((char)0); @@ -96,8 +97,8 @@ char TransactionPart::getType() { } Entry *TransactionPart::getCopy(Slot *s) { - TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); - copyTransaction.setSequenceNumber(sequenceNumber); + TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, fldisLastPart); + copyTransaction->setSequenceNumber(sequenceNumber); return copyTransaction; } diff --git a/version2/src/C/TransactionPart.h b/version2/src/C/TransactionPart.h index 84aa1b4..c7b6a8a 100644 --- a/version2/src/C/TransactionPart.h +++ b/version2/src/C/TransactionPart.h @@ -2,6 +2,7 @@ #define TRANSACTIONPART_H #include "common.h" #include "Entry.h" +#include "Pair.h" // Max size of the part excluding the fixed size header #define TransactionPart_MAX_NON_HEADER_SIZE 512 @@ -12,7 +13,7 @@ private: int64_t machineId; int64_t arbitratorId; int64_t clientLocalSequenceNumber; // Sequence number of the transaction that this is a part of - int partNumber; // Parts position in the + int32_t partNumber; // Parts position in the bool fldisLastPart; Pair *transactionId; @@ -34,7 +35,7 @@ public: } int getSize(); - void setSlot(Slot s); + void setSlot(Slot* s); Pair *getTransactionId(); int64_t getArbitratorId(); Pair *getPartId(); -- 2.34.1