Ref counting
authorbdemsky <bdemsky@uci.edu>
Wed, 28 Mar 2018 19:47:55 +0000 (12:47 -0700)
committerbdemsky <bdemsky@uci.edu>
Wed, 28 Mar 2018 19:47:55 +0000 (12:47 -0700)
13 files changed:
version2/src/C/ArbitrationRound.cc
version2/src/C/Commit.cc
version2/src/C/Commit.h
version2/src/C/CommitPart.cc
version2/src/C/CommitPart.h
version2/src/C/Entry.h
version2/src/C/PendingTransaction.cc
version2/src/C/Slot.cc
version2/src/C/Table.cc
version2/src/C/Transaction.cc
version2/src/C/Transaction.h
version2/src/C/TransactionPart.h
version2/src/C/vector.h

index 159aeae..cc316f0 100644 (file)
@@ -1,5 +1,6 @@
 #include "ArbitrationRound.h"
 #include "Commit.h"
+#include "CommitPart.h"
 
 ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset<Abort *> *_abortsBefore) :
        abortsBefore(_abortsBefore),
@@ -19,6 +20,11 @@ ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset<Abort *> *_abortsBef
 
 ArbitrationRound::~ArbitrationRound() {
        delete abortsBefore;
+       uint partsSize = parts->size();
+       for (uint i = 0; i < partsSize; i++) {
+               Entry * part = parts->get(i);
+               part->releaseRef();
+       }
        delete parts;
        if (commit != NULL)
                delete commit;
@@ -28,6 +34,11 @@ void ArbitrationRound::generateParts() {
        if (didGenerateParts) {
                return;
        }
+       uint partsSize = parts->size();
+       for (uint i = 0; i < partsSize; i++) {
+               Entry * part = parts->get(i);
+               part->releaseRef();
+       }
        parts->clear();
        SetIterator<Abort *, Abort *> *abit = abortsBefore->iterator();
        while (abit->hasNext())
@@ -37,7 +48,9 @@ void ArbitrationRound::generateParts() {
                Vector<CommitPart *> *cParts = commit->getParts();
                uint cPartsSize = cParts->size();
                for (uint i = 0; i < cPartsSize; i++) {
-                       parts->add((Entry *)cParts->get(i));
+                       CommitPart * part = cParts->get(i);
+                       part->acquireRef();
+                       parts->add((Entry *)part);
                }
        }
 }
@@ -47,7 +60,12 @@ Vector<Entry *> *ArbitrationRound::getParts() {
 }
 
 void ArbitrationRound::removeParts(Vector<Entry *> *removeParts) {
-       parts->removeAll(removeParts);
+       uint size = removeParts->size();
+       for(uint i=0; i < size; i++) {
+               Entry * e = removeParts->get(i);
+               if (parts->remove(e))
+                       e->releaseRef();
+       }
        didSendPart = true;
 }
 
index e01a778..7665841 100644 (file)
@@ -14,6 +14,7 @@ Commit::Commit() :
        sequenceNumber(-1),
        machineId(-1),
        transactionSequenceNumber(-1),
+       dataBytes(NULL),
        liveKeys(new Hashset<IoTString *>()) {
 }
 
@@ -28,11 +29,17 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction
        sequenceNumber(_sequenceNumber),
        machineId(_machineId),
        transactionSequenceNumber(_transactionSequenceNumber),
+       dataBytes(NULL),
        liveKeys(new Hashset<IoTString *>()) {
 }
 
 Commit::~Commit() {
-       delete parts;
+       {
+               uint Size = parts->size();
+               for(uint i=0;i<Size; i++)
+                       parts->get(i)->releaseRef();
+               delete parts;
+       }
        {
                SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
                while(keyit->hasNext()) {
@@ -44,6 +51,8 @@ Commit::~Commit() {
        delete liveKeys;
        if (missingParts != NULL)
                delete missingParts;
+       if (dataBytes != NULL)
+               delete dataBytes;
 }
 
 void Commit::addPartDecode(CommitPart *newPart) {
@@ -53,6 +62,7 @@ void Commit::addPartDecode(CommitPart *newPart) {
                return;
        }
 
+       newPart->acquireRef();
        CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
        if (previouslySeenPart == NULL)
                partCount++;
@@ -60,6 +70,7 @@ void Commit::addPartDecode(CommitPart *newPart) {
        if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
                previouslySeenPart->setDead();
+               previouslySeenPart->releaseRef();
        } else if (newPart->isLastPart()) {
                missingParts = new Hashset<int32_t>();
                hasLastPart = true;
@@ -133,17 +144,17 @@ void Commit::setDead() {
                // Make all the parts of this transaction dead
                for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
                        CommitPart *part = parts->get(partNumber);
-                       if (parts != NULL)
-                               part->setDead();
+                       part->setDead();
                }
        }
 }
 
-CommitPart *Commit::getPart(int index) {
-       return parts->get(index);
-}
-
 void Commit::createCommitParts() {
+       uint Size = parts->size();
+       for(uint i=0;i < Size; i++) {
+               Entry * e=parts->get(i);
+               e->releaseRef();
+       }
        parts->clear();
        partCount = 0;
        // Convert to chars
index b8d69fa..d9f05a9 100644 (file)
@@ -16,6 +16,7 @@ private:
        int64_t machineId;
        int64_t transactionSequenceNumber;
        Hashset<IoTString *> *liveKeys;
+       Array<char> *dataBytes;
        Array<char> *convertDataToBytes();
        void setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs);
 
@@ -35,7 +36,6 @@ public:
        bool isComplete() { return fldisComplete; }
        bool isLive() { return !isDead; }
        void setDead();
-       CommitPart *getPart(int32_t index);
        void createCommitParts();
        void decodeCommitData();
        friend Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber);
index a23f5de..6cb382c 100644 (file)
@@ -8,6 +8,7 @@ CommitPart::CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int
        transactionSequenceNumber(_transactionSequenceNumber),
        partNumber(_partNumber),
        fldisLastPart(_isLastPart),
+       refCount(1),
        data(_data),
        partId(Pair<int64_t, int32_t>(sequenceNumber, partNumber)),
        commitId(Pair<int64_t, int64_t>(machineId, sequenceNumber)) {
index 606f889..7b8be2e 100644 (file)
@@ -15,6 +15,7 @@ private:
        int64_t transactionSequenceNumber;
        int32_t partNumber;     // Parts position in the
        bool fldisLastPart;
+       int32_t refCount;
        Array<char> *data;
 
        Pair<int64_t, int32_t> partId;
@@ -36,6 +37,8 @@ public:
        void encode(ByteBuffer *bb);
        char getType();
        Entry *getCopy(Slot *s);
+       void releaseRef() {if ((--refCount)==0) delete this;}   
+       void acquireRef() {refCount++;}
 };
 
 Entry *CommitPart_decode(Slot *s, ByteBuffer *bb);
index 2a0c9f4..2c01829 100644 (file)
@@ -51,7 +51,8 @@ public:
         * array.
         */
        virtual int getSize() = 0;
-
+       virtual void releaseRef() {delete this;}
+       virtual void acquireRef() {}
 
        /**
         * Returns a copy of the Entry that can be added to a different slot.
index 45602a0..c0d32a3 100644 (file)
@@ -131,7 +131,8 @@ Transaction *PendingTransaction::createTransaction() {
 
                TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
                newTransaction->addPartEncode(part);
-
+               part->releaseRef();
+               
                // Update position, count and remaining
                currentPosition += copySize;
                transactionPartCount++;
index 6f37c9a..31cce57 100644 (file)
@@ -54,7 +54,7 @@ Slot::~Slot() {
                delete hmac;
        delete prevhmac;
        for(uint i=0; i< entries->size(); i++)
-               delete entries->get(i);
+               entries->get(i)->releaseRef();
        delete entries;
        if (fakeLastMessage)
                delete fakeLastMessage;
index 1a33f76..255ba3c 100644 (file)
@@ -199,6 +199,13 @@ Table::~Table() {
                while (partsit->hasNext()) {
                        int64_t machineId = partsit->next();
                        Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+                       SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
+                       while(pit->hasNext()) {
+                               Pair<int64_t, int32_t> * pair=pit->next();
+                               pit->currVal()->releaseRef();
+                       }
+                       delete pit;
+                       
                        delete parts;
                }
                delete partsit;
@@ -209,6 +216,12 @@ Table::~Table() {
                while (partsit->hasNext()) {
                        int64_t machineId = partsit->next();
                        Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+                       SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
+                       while(pit->hasNext()) {
+                               Pair<int64_t, int32_t> * pair=pit->next();
+                               pit->currVal()->releaseRef();
+                       }
+                       delete pit;
                        delete parts;
                }
                delete partsit;
@@ -1628,7 +1641,7 @@ void Table::processNewTransactionParts() {
        SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
        while (tpit->hasNext()) {
                int64_t machineId = tpit->next();
-               Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+               Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
 
                SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
                // Iterate through all the parts for that machine Id
@@ -1641,6 +1654,7 @@ void Table::processNewTransactionParts() {
                                if (lastTransactionNumber >= part->getSequenceNumber()) {
                                        // Set dead the transaction part
                                        part->setDead();
+                                       part->releaseRef();
                                        continue;
                                }
                        }
@@ -1659,6 +1673,7 @@ void Table::processNewTransactionParts() {
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
                                liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
                        }
+                       part->releaseRef();
                }
                delete ptit;
        }
@@ -1854,7 +1869,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                while (kvit->hasNext()) {
                        KeyValue *kv = kvit->next();
-                       newCommit->addKV(kv->getCopy());
+                       newCommit->addKV(kv);
                }
                delete kvit;
 
@@ -2052,7 +2067,7 @@ bool Table::updateCommittedTable() {
                SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
                while (pairit->hasNext()) {
                        Pair<int64_t, int32_t> *partId = pairit->next();
-                       CommitPart *part = parts->get(partId);
+                       CommitPart *part = pairit->currVal();
 
                        // Get the transaction object for that sequence number
                        Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
@@ -2075,6 +2090,7 @@ bool Table::updateCommittedTable() {
 
                        // Add that part to the commit
                        commit->addPartDecode(part);
+                       part->releaseRef();
                }
                delete pairit;
                delete parts;
@@ -2680,8 +2696,10 @@ void Table::processEntry(TransactionPart *entry) {
 
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
+       entry->acquireRef();
        TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
+               previouslySeenPart->releaseRef();
                previouslySeenPart->setDead();
        }
 }
@@ -2706,9 +2724,11 @@ void Table::processEntry(CommitPart *entry) {
        }
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
+       entry->acquireRef();
        CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
+               previouslySeenPart->releaseRef();
        }
 }
 
index 258c22b..b0d3c5b 100644 (file)
@@ -30,6 +30,9 @@ Transaction::~Transaction() {
        if (missingParts)
                delete missingParts;
        {
+               uint Size = parts->size();
+               for(uint i=0; i<Size; i++)
+                       parts->get(i)->releaseRef();
                delete parts;
        }
        {
@@ -54,11 +57,14 @@ Transaction::~Transaction() {
 }
 
 void Transaction::addPartEncode(TransactionPart *newPart) {
+       newPart->acquireRef();
+       printf("Add part %d\n", newPart->getPartNumber());
        TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
        if (old == NULL) {
                partCount++;
-       } else
-               delete old;
+       } else {
+               old->releaseRef();
+       }
        partsPendingSend->add(newPart->getPartNumber());
 
        sequenceNumber = newPart->getSequenceNumber();
@@ -75,7 +81,7 @@ void Transaction::addPartDecode(TransactionPart *newPart) {
                newPart->setDead();
                return;
        }
-
+       newPart->acquireRef();
        sequenceNumber = newPart->getSequenceNumber();
        arbitratorId = newPart->getArbitratorId();
        transactionId = newPart->getTransactionId();
@@ -88,6 +94,7 @@ void Transaction::addPartDecode(TransactionPart *newPart) {
 
        if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
+               previouslySeenPart->releaseRef();
                previouslySeenPart->setDead();
        } else if (newPart->isLastPart()) {
                missingParts = new Hashset<int32_t>();
@@ -253,10 +260,6 @@ void Transaction::setDead() {
        }
 }
 
-TransactionPart *Transaction::getPart(int index) {
-       return parts->get(index);
-}
-
 void Transaction::decodeTransactionData() {
        // Calculate the size of the data section
        int dataSize = 0;
index 57ce43a..a997382 100644 (file)
@@ -54,7 +54,6 @@ public:
        bool isComplete();
        Pair<int64_t, int64_t> *getId();
        void setDead();
-       TransactionPart *getPart(int32_t index);
        bool evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable);
 };
 #endif
index c254049..47966e4 100644 (file)
@@ -15,12 +15,12 @@ private:
        int64_t clientLocalSequenceNumber;              // Sequence number of the transaction that this is a part of
        int32_t partNumber;     // Parts position in the
        bool fldisLastPart;
-
+       int32_t refCount;
        Pair<int64_t, int64_t> transactionId;
        Pair<int64_t, int32_t> partId;
 
        Array<char> *data;
-
+       
 public:
        TransactionPart(Slot *s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, Array<char> *_data, bool _isLastPart) : Entry(s),
                sequenceNumber(-1),
@@ -29,6 +29,7 @@ public:
                clientLocalSequenceNumber(_clientLocalSequenceNumber),
                partNumber(_partNumber),
                fldisLastPart(_isLastPart),
+               refCount(1),
                transactionId(Pair<int64_t, int64_t>(machineId, clientLocalSequenceNumber)),
                partId(Pair<int64_t, int32_t>(clientLocalSequenceNumber, partNumber)),
                data(_data) {
@@ -48,6 +49,8 @@ public:
        void setSequenceNumber(int64_t _sequenceNumber);
        void encode(ByteBuffer *bb);
        char getType();
+       void releaseRef() {if ((--refCount)==0) delete this;}   
+       void acquireRef() {refCount++;}
        Entry *getCopy(Slot *s);
 };
 
index e1378c9..5ca954b 100644 (file)
@@ -30,16 +30,17 @@ public:
                fldsize--;
        }
 
-       void remove(type t) {
+       bool remove(type t) {
                for (uint i = 0; i < fldsize; i++) {
                        if (array[i] == t) {
                                for (i++; i < fldsize; i++) {
                                        array[i - 1] = array[i];
                                }
                                fldsize--;
-                               break;
+                               return true;
                        }
                }
+               return false;
        }
 
        void removeIndex(uint i) {