edits
authorbdemsky <bdemsky@uci.edu>
Sat, 20 Jan 2018 00:15:27 +0000 (16:15 -0800)
committerbdemsky <bdemsky@uci.edu>
Sat, 20 Jan 2018 00:15:27 +0000 (16:15 -0800)
version2/src/C/CommitPart.cc
version2/src/C/CommitPart.h
version2/src/C/Transaction.cc
version2/src/C/TransactionPart.cc
version2/src/C/TransactionPart.h

index 08cd46bf5bb1b787d049f053a173bac4ec6fc931..d137a75e1611b1e36cc02700f8baa14061355b95 100644 (file)
@@ -1,5 +1,5 @@
 #include "CommitPart.h"
-
+#include "ByteBuffer.h"
 
 CommitPart::CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, Array<char> *_data, bool _isLastPart) :
        Entry(s),
@@ -9,7 +9,7 @@ CommitPart::CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int
        partNumber(_partNumber),
        fldisLastPart(_isLastPart),
        data(_data),
-       partId(new Pair<int64_t int32_t>(sequenceNumber, partNumber)),
+       partId(new Pair<int64_t, int32_t>(sequenceNumber, partNumber)),
        commitId(new Pair<int64_t, int64_t>(machineId, sequenceNumber)) {
 }
 
@@ -20,7 +20,7 @@ int CommitPart::getSize() {
        return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length();
 }
 
-void CommitPart::setSlot(Slot s) {
+void CommitPart::setSlot(Slot* s) {
        parentslot = s;
 }
 
@@ -36,7 +36,7 @@ Array<char> *CommitPart::getData() {
        return data;
 }
 
-Pair<int64_t int32_t> *CommitPart::getPartId() {
+Pair<int64_t, int32_t> *CommitPart::getPartId() {
        return partId;
 }
 
@@ -75,13 +75,13 @@ Entry *CommitPart_decode(Slot *s, ByteBuffer *bb) {
        return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
 }
 
-void CommitPart::encode(ByteBuffer bb) {
+void CommitPart::encode(ByteBuffer *bb) {
        bb->put(TypeCommitPart);
        bb->putLong(machineId);
        bb->putLong(sequenceNumber);
        bb->putLong(transactionSequenceNumber);
        bb->putInt(partNumber);
-       bb->putInt(data.length);
+       bb->putInt(data->length());
 
        if (fldisLastPart) {
                bb->put((char)1);
index 19e8b7af9421fc2ddf9524e57277cbb52c587978..e0a0d91b4c5f7cc659a298520acf4f761e1f3295 100644 (file)
@@ -2,6 +2,7 @@
 #define COMMITPART_H
 #include "common.h"
 #include "Entry.h"
+#include "Pair.h"
 
 // Max size of the part excluding the fixed size header
 #define CommitPart_MAX_NON_HEADER_SIZE 512
index aad62e5b6373cb9ab984367af144a5d6f23ad2f7..691a78f9609118aa8d00c3467606bb8380dbf825 100644 (file)
@@ -1,13 +1,13 @@
 #include "Transaction.h"
 
 Transaction::Transaction() :
-       parts(new Hashtable<int32_t, TransactionPart>()),
+       parts(new Hashtable<int32_t, TransactionPart *>()),
        missingParts(NULL),
        partsPendingSend(new Vector<int32_t>()),
        fldisComplete(false),
        hasLastPart(false),
-       keyValueGuardSet(new HashSet<KeyValue>()),
-       keyValueUpdateSet(new HashSet<KeyValue>()),
+       keyValueGuardSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *>()),
        isDead(false),
        sequenceNumber(-1),
        clientLocalSequenceNumber(-1),
@@ -18,14 +18,14 @@ Transaction::Transaction() :
 }
 
 void Transaction::addPartEncode(TransactionPart *newPart) {
-       parts.put(newPart.getPartNumber(), newPart);
-       partsPendingSend.add(newPart.getPartNumber());
+       parts->put(newPart->getPartNumber(), newPart);
+       partsPendingSend->add(newPart->getPartNumber());
 
-       sequenceNumber = newPart.getSequenceNumber();
-       arbitratorId = newPart.getArbitratorId();
-       transactionId = newPart.getTransactionId();
-       clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
-       machineId = newPart.getMachineId();
+       sequenceNumber = newPart->getSequenceNumber();
+       arbitratorId = newPart->getArbitratorId();
+       transactionId = newPart->getTransactionId();
+       clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+       machineId = newPart->getMachineId();
 
        fldisComplete = true;
 }
@@ -33,28 +33,28 @@ void Transaction::addPartEncode(TransactionPart *newPart) {
 void Transaction::addPartDecode(TransactionPart *newPart) {
        if (isDead) {
                // If dead then just kill this part and move on
-               newPart.setDead();
+               newPart->setDead();
                return;
        }
 
-       sequenceNumber = newPart.getSequenceNumber();
-       arbitratorId = newPart.getArbitratorId();
-       transactionId = newPart.getTransactionId();
-       clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
-       machineId = newPart.getMachineId();
+       sequenceNumber = newPart->getSequenceNumber();
+       arbitratorId = newPart->getArbitratorId();
+       transactionId = newPart->getTransactionId();
+       clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+       machineId = newPart->getMachineId();
 
-       TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+       TransactionPart previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
 
        if (previoslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
-               previoslySeenPart.setDead();
-       } else if (newPart.isLastPart()) {
-               missingParts = new HashSet<int32_t>();
+               previoslySeenPart->setDead();
+       } else if (newPart->isLastPart()) {
+               missingParts = new Hashset<int32_t>();
                hasLastPart = true;
 
-               for (int i = 0; i < newPart.getPartNumber(); i++) {
-                       if (parts.get(i) == NULL) {
-                               missingParts.add(i);
+               for (int i = 0; i < newPart->getPartNumber(); i++) {
+                       if (parts->get(i) == NULL) {
+                               missingParts->add(i);
                        }
                }
        }
@@ -62,10 +62,10 @@ void Transaction::addPartDecode(TransactionPart *newPart) {
        if (!fldisComplete && hasLastPart) {
 
                // We have seen this part so remove it from the set of missing parts
-               missingParts.remove(newPart.getPartNumber());
+               missingParts->remove(newPart->getPartNumber());
 
                // Check if all the parts have been seen
-               if (missingParts.size() == 0) {
+               if (missingParts->size() == 0) {
 
                        // We have all the parts
                        fldisComplete = true;
@@ -77,11 +77,11 @@ void Transaction::addPartDecode(TransactionPart *newPart) {
 }
 
 void Transaction::addUpdateKV(KeyValue *kv) {
-       keyValueUpdateSet.add(kv);
+       keyValueUpdateSet->add(kv);
 }
 
 void Transaction::addGuardKV(KeyValue *kv) {
-       keyValueGuardSet.add(kv);
+       keyValueGuardSet->add(kv);
 }
 
 
@@ -92,8 +92,8 @@ int64_t Transaction::getSequenceNumber() {
 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
        sequenceNumber = _sequenceNumber;
 
-       for (int32_t i : parts.keySet()) {
-               parts.get(i).setSequenceNumber(sequenceNumber);
+       for (int32_t i : parts->keySet()) {
+               parts->get(i)->setSequenceNumber(sequenceNumber);
        }
 }
 
@@ -114,10 +114,10 @@ void Transaction::resetNextPartToSend() {
 }
 
 TransactionPart *Transaction::getNextPartToSend() {
-       if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+       if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
                return NULL;
        }
-       TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+       TransactionPart part = parts->get(partsPendingSend->get(nextPartToSend));
        nextPartToSend++;
        return part;
 }
@@ -147,15 +147,15 @@ TransactionStatus *Transaction::getTransactionStatus() {
 
 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
        nextPartToSend = 0;
-       if (partsPendingSend.removeAll(sentParts))
+       if (partsPendingSend->removeAll(sentParts))
        {
                flddidSendAPartToServer = true;
-               transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+               transactionStatus->setTransactionSequenceNumber(sequenceNumber);
        }
 }
 
 bool Transaction::didSendAllParts() {
-       return partsPendingSend.isEmpty();
+       return partsPendingSend->isEmpty();
 }
 
 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
@@ -163,7 +163,7 @@ Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
 }
 
 int Transaction::getNumberOfParts() {
-       return parts.size();
+       return parts->size();
 }
 
 int64_t Transaction::getMachineId() {
@@ -192,52 +192,52 @@ void Transaction::setDead() {
        isDead = true;
 
        // Make all the parts of this transaction dead
-       for (int32_t partNumber : parts.keySet()) {
-               TransactionPart part = parts.get(partNumber);
-               part.setDead();
+       for (int32_t partNumber : parts->keySet()) {
+               TransactionPart part = parts->get(partNumber);
+               part->setDead();
        }
 }
 
 TransactionPart *Transaction::getPart(int index) {
-       return parts.get(index);
+       return parts->get(index);
 }
 
 void Transaction::decodeTransactionData() {
 
        // Calculate the size of the data section
        int dataSize = 0;
-       for (int i = 0; i < parts.keySet().size(); i++) {
-               TransactionPart tp = parts.get(i);
-               dataSize += tp.getDataSize();
+       for (int i = 0; i < parts->keySet()->size(); i++) {
+               TransactionPart tp = parts->get(i);
+               dataSize += tp->getDataSize();
        }
 
        Array<char> *combinedData = new char[dataSize];
        int currentPosition = 0;
 
        // Stitch all the data sections together
-       for (int i = 0; i < parts.keySet().size(); i++) {
-               TransactionPart tp = parts.get(i);
-               System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
-               currentPosition += tp.getDataSize();
+       for (int i = 0; i < parts->keySet()->size(); i++) {
+               TransactionPart tp = parts->get(i);
+               System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+               currentPosition += tp->getDataSize();
        }
 
        // Decoder Object
-       ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+       ByteBuffer bbDecode = ByteBuffer_wrap(combinedData);
 
        // Decode how many key value pairs need to be decoded
-       int numberOfKVGuards = bbDecode.getInt();
-       int numberOfKVUpdates = bbDecode.getInt();
+       int numberOfKVGuards = bbDecode->getInt();
+       int numberOfKVUpdates = bbDecode->getInt();
 
        // Decode all the guard key values
        for (int i = 0; i < numberOfKVGuards; i++) {
-               KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
-               keyValueGuardSet.add(kv);
+               KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueGuardSet->add(kv);
        }
 
        // Decode all the updates key values
        for (int i = 0; i < numberOfKVUpdates; i++) {
-               KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
-               keyValueUpdateSet.add(kv);
+               KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
+               keyValueUpdateSet->add(kv);
        }
 }
 
@@ -245,32 +245,32 @@ bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKey
        for (KeyValue *kvGuard : keyValueGuardSet) {
 
                // First check if the key is in the speculative table, this is the value of the latest assumption
-               KeyValue kv = NULL;
+               KeyValue kv = NULL;
 
                // If we have a speculation table then use it first
                if (pendingTransactionSpeculatedKeyValueTable != NULL) {
-                       kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+                       kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
                }
 
                // If we have a speculation table then use it first
                if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
-                       kv = speculatedKeyValueTable.get(kvGuard.getKey());
+                       kv = speculatedKeyValueTable->get(kvGuard->getKey());
                }
 
                if (kv == NULL) {
                        // if it is not in the speculative table then check the committed table and use that
                        // value as our latest assumption
-                       kv = committedKeyValueTable.get(kvGuard.getKey());
+                       kv = committedKeyValueTable->get(kvGuard->getKey());
                }
 
-               if (kvGuard.getValue() != NULL) {
-                       if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+               if (kvGuard->getValue() != NULL) {
+                       if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
 
 
                                if (kv != NULL) {
-                                       System.out.println(kvGuard.getValue() + "       " + kv.getValue());
+                                       System.out.println(kvGuard->getValue() + "       " + kv->getValue());
                                } else {
-                                       System.out.println(kvGuard.getValue() + "       " + kv);
+                                       System.out.println(kvGuard->getValue() + "       " + kv);
                                }
 
                                return false;
index 2c94865e177941f8ff47dc852df221ce61cb3627..1b47b06d488d66e5fd4c336ce621fde6a311329b 100644 (file)
@@ -1,10 +1,11 @@
 #include "TransactionPart.h"
+#include "ByteBuffer.h"
 
 int TransactionPart::getSize() {
        if (data == NULL) {
                return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
        }
-       return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
+       return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length();
 }
 
 void TransactionPart::setSlot(Slot *s) {
@@ -19,7 +20,7 @@ int64_t TransactionPart::getArbitratorId() {
        return arbitratorId;
 }
 
-Pair<int64_t int32_t> *TransactionPart::getPartId() {
+Pair<int64_t, int32_t> *TransactionPart::getPartId() {
        return partId;
 }
 
@@ -28,7 +29,7 @@ int TransactionPart::getPartNumber() {
 }
 
 int TransactionPart::getDataSize() {
-       return data.length;
+       return data->length();
 }
 
 Array<char> *TransactionPart::getData() {
@@ -36,7 +37,7 @@ Array<char> *TransactionPart::getData() {
 }
 
 bool TransactionPart::isLastPart() {
-       return isLastPart;
+       return fldisLastPart;
 }
 
 int64_t TransactionPart::getMachineId() {
@@ -67,22 +68,22 @@ Entry *TransactionPart_decode(Slot *s, ByteBuffer *bb) {
        Array<char> *data = new Array<char>(dataSize);
        bb->get(data);
 
-       TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
-       returnTransactionPart.setSequenceNumber(sequenceNumber);
+       TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
+       returnTransactionPart->setSequenceNumber(sequenceNumber);
 
        return returnTransactionPart;
 }
 
 void TransactionPart::encode(ByteBuffer *bb) {
-       bb->put(Entry.TypeTransactionPart);
+       bb->put(TypeTransactionPart);
        bb->putLong(sequenceNumber);
        bb->putLong(machineId);
        bb->putLong(arbitratorId);
        bb->putLong(clientLocalSequenceNumber);
        bb->putInt(partNumber);
-       bb->putInt(data.length);
+       bb->putInt(data->length());
 
-       if (isLastPart) {
+       if (fldisLastPart) {
                bb->put((char)1);
        } else {
                bb->put((char)0);
@@ -96,8 +97,8 @@ char TransactionPart::getType() {
 }
 
 Entry *TransactionPart::getCopy(Slot *s) {
-       TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
-       copyTransaction.setSequenceNumber(sequenceNumber);
+       TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, fldisLastPart);
+       copyTransaction->setSequenceNumber(sequenceNumber);
 
        return copyTransaction;
 }
index 84aa1b4b4a1ad5004e6a435a3870ae50c57408ff..c7b6a8a04dd1c773f233beb69b5c42f8417d7c34 100644 (file)
@@ -2,6 +2,7 @@
 #define TRANSACTIONPART_H
 #include "common.h"
 #include "Entry.h"
+#include "Pair.h"
 
 // Max size of the part excluding the fixed size header
 #define TransactionPart_MAX_NON_HEADER_SIZE 512
@@ -12,7 +13,7 @@ private:
        int64_t machineId;
        int64_t arbitratorId;
        int64_t clientLocalSequenceNumber;              // Sequence number of the transaction that this is a part of
-       int partNumber; // Parts position in the
+       int32_t partNumber;     // Parts position in the
        bool fldisLastPart;
 
        Pair<int64_t, int64_t> *transactionId;
@@ -34,7 +35,7 @@ public:
        }
 
        int getSize();
-       void setSlot(Slot s);
+       void setSlot(Slot* s);
        Pair<int64_t, int64_t> *getTransactionId();
        int64_t getArbitratorId();
        Pair<int64_t, int32_t> *getPartId();