From 3f24bffc82ebfe2730308b63100af08645316577 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Wed, 28 Mar 2018 12:47:55 -0700 Subject: [PATCH] Ref counting --- version2/src/C/ArbitrationRound.cc | 22 ++++++++++++++++++++-- version2/src/C/Commit.cc | 25 ++++++++++++++++++------- version2/src/C/Commit.h | 2 +- version2/src/C/CommitPart.cc | 1 + version2/src/C/CommitPart.h | 3 +++ version2/src/C/Entry.h | 3 ++- version2/src/C/PendingTransaction.cc | 3 ++- version2/src/C/Slot.cc | 2 +- version2/src/C/Table.cc | 26 +++++++++++++++++++++++--- version2/src/C/Transaction.cc | 17 ++++++++++------- version2/src/C/Transaction.h | 1 - version2/src/C/TransactionPart.h | 7 +++++-- version2/src/C/vector.h | 5 +++-- 13 files changed, 89 insertions(+), 28 deletions(-) diff --git a/version2/src/C/ArbitrationRound.cc b/version2/src/C/ArbitrationRound.cc index 159aeae..cc316f0 100644 --- a/version2/src/C/ArbitrationRound.cc +++ b/version2/src/C/ArbitrationRound.cc @@ -1,5 +1,6 @@ #include "ArbitrationRound.h" #include "Commit.h" +#include "CommitPart.h" ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset *_abortsBefore) : abortsBefore(_abortsBefore), @@ -19,6 +20,11 @@ ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset *_abortsBef ArbitrationRound::~ArbitrationRound() { delete abortsBefore; + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + Entry * part = parts->get(i); + part->releaseRef(); + } delete parts; if (commit != NULL) delete commit; @@ -28,6 +34,11 @@ void ArbitrationRound::generateParts() { if (didGenerateParts) { return; } + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + Entry * part = parts->get(i); + part->releaseRef(); + } parts->clear(); SetIterator *abit = abortsBefore->iterator(); while (abit->hasNext()) @@ -37,7 +48,9 @@ void ArbitrationRound::generateParts() { Vector *cParts = commit->getParts(); uint cPartsSize = cParts->size(); for (uint i = 0; i < cPartsSize; i++) { - parts->add((Entry *)cParts->get(i)); + CommitPart * part = cParts->get(i); + part->acquireRef(); + parts->add((Entry *)part); } } } @@ -47,7 +60,12 @@ Vector *ArbitrationRound::getParts() { } void ArbitrationRound::removeParts(Vector *removeParts) { - parts->removeAll(removeParts); + uint size = removeParts->size(); + for(uint i=0; i < size; i++) { + Entry * e = removeParts->get(i); + if (parts->remove(e)) + e->releaseRef(); + } didSendPart = true; } diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index e01a778..7665841 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -14,6 +14,7 @@ Commit::Commit() : sequenceNumber(-1), machineId(-1), transactionSequenceNumber(-1), + dataBytes(NULL), liveKeys(new Hashset()) { } @@ -28,11 +29,17 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction sequenceNumber(_sequenceNumber), machineId(_machineId), transactionSequenceNumber(_transactionSequenceNumber), + dataBytes(NULL), liveKeys(new Hashset()) { } Commit::~Commit() { - delete parts; + { + uint Size = parts->size(); + for(uint i=0;iget(i)->releaseRef(); + delete parts; + } { SetIterator * keyit = keyValueUpdateSet->iterator(); while(keyit->hasNext()) { @@ -44,6 +51,8 @@ Commit::~Commit() { delete liveKeys; if (missingParts != NULL) delete missingParts; + if (dataBytes != NULL) + delete dataBytes; } void Commit::addPartDecode(CommitPart *newPart) { @@ -53,6 +62,7 @@ void Commit::addPartDecode(CommitPart *newPart) { return; } + newPart->acquireRef(); CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); if (previouslySeenPart == NULL) partCount++; @@ -60,6 +70,7 @@ void Commit::addPartDecode(CommitPart *newPart) { if (previouslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part previouslySeenPart->setDead(); + previouslySeenPart->releaseRef(); } else if (newPart->isLastPart()) { missingParts = new Hashset(); hasLastPart = true; @@ -133,17 +144,17 @@ void Commit::setDead() { // Make all the parts of this transaction dead for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) { CommitPart *part = parts->get(partNumber); - if (parts != NULL) - part->setDead(); + part->setDead(); } } } -CommitPart *Commit::getPart(int index) { - return parts->get(index); -} - void Commit::createCommitParts() { + uint Size = parts->size(); + for(uint i=0;i < Size; i++) { + Entry * e=parts->get(i); + e->releaseRef(); + } parts->clear(); partCount = 0; // Convert to chars diff --git a/version2/src/C/Commit.h b/version2/src/C/Commit.h index b8d69fa..d9f05a9 100644 --- a/version2/src/C/Commit.h +++ b/version2/src/C/Commit.h @@ -16,6 +16,7 @@ private: int64_t machineId; int64_t transactionSequenceNumber; Hashset *liveKeys; + Array *dataBytes; Array *convertDataToBytes(); void setKVsMap(Hashset *newKVs); @@ -35,7 +36,6 @@ public: bool isComplete() { return fldisComplete; } bool isLive() { return !isDead; } void setDead(); - CommitPart *getPart(int32_t index); void createCommitParts(); void decodeCommitData(); friend Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber); diff --git a/version2/src/C/CommitPart.cc b/version2/src/C/CommitPart.cc index a23f5de..6cb382c 100644 --- a/version2/src/C/CommitPart.cc +++ b/version2/src/C/CommitPart.cc @@ -8,6 +8,7 @@ CommitPart::CommitPart(Slot *s, int64_t _machineId, int64_t _sequenceNumber, int transactionSequenceNumber(_transactionSequenceNumber), partNumber(_partNumber), fldisLastPart(_isLastPart), + refCount(1), data(_data), partId(Pair(sequenceNumber, partNumber)), commitId(Pair(machineId, sequenceNumber)) { diff --git a/version2/src/C/CommitPart.h b/version2/src/C/CommitPart.h index 606f889..7b8be2e 100644 --- a/version2/src/C/CommitPart.h +++ b/version2/src/C/CommitPart.h @@ -15,6 +15,7 @@ private: int64_t transactionSequenceNumber; int32_t partNumber; // Parts position in the bool fldisLastPart; + int32_t refCount; Array *data; Pair partId; @@ -36,6 +37,8 @@ public: void encode(ByteBuffer *bb); char getType(); Entry *getCopy(Slot *s); + void releaseRef() {if ((--refCount)==0) delete this;} + void acquireRef() {refCount++;} }; Entry *CommitPart_decode(Slot *s, ByteBuffer *bb); diff --git a/version2/src/C/Entry.h b/version2/src/C/Entry.h index 2a0c9f4..2c01829 100644 --- a/version2/src/C/Entry.h +++ b/version2/src/C/Entry.h @@ -51,7 +51,8 @@ public: * array. */ virtual int getSize() = 0; - + virtual void releaseRef() {delete this;} + virtual void acquireRef() {} /** * Returns a copy of the Entry that can be added to a different slot. diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index 45602a0..c0d32a3 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -131,7 +131,8 @@ Transaction *PendingTransaction::createTransaction() { TransactionPart *part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart); newTransaction->addPartEncode(part); - + part->releaseRef(); + // Update position, count and remaining currentPosition += copySize; transactionPartCount++; diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 6f37c9a..31cce57 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -54,7 +54,7 @@ Slot::~Slot() { delete hmac; delete prevhmac; for(uint i=0; i< entries->size(); i++) - delete entries->get(i); + entries->get(i)->releaseRef(); delete entries; if (fakeLastMessage) delete fakeLastMessage; diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 1a33f76..255ba3c 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -199,6 +199,13 @@ Table::~Table() { while (partsit->hasNext()) { int64_t machineId = partsit->next(); Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts); + while(pit->hasNext()) { + Pair * pair=pit->next(); + pit->currVal()->releaseRef(); + } + delete pit; + delete parts; } delete partsit; @@ -209,6 +216,12 @@ Table::~Table() { while (partsit->hasNext()) { int64_t machineId = partsit->next(); Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts); + while(pit->hasNext()) { + Pair * pair=pit->next(); + pit->currVal()->releaseRef(); + } + delete pit; delete parts; } delete partsit; @@ -1628,7 +1641,7 @@ void Table::processNewTransactionParts() { SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); while (tpit->hasNext()) { int64_t machineId = tpit->next(); - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal(); SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id @@ -1641,6 +1654,7 @@ void Table::processNewTransactionParts() { if (lastTransactionNumber >= part->getSequenceNumber()) { // Set dead the transaction part part->setDead(); + part->releaseRef(); continue; } } @@ -1659,6 +1673,7 @@ void Table::processNewTransactionParts() { liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); } + part->releaseRef(); } delete ptit; } @@ -1854,7 +1869,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - newCommit->addKV(kv->getCopy()); + newCommit->addKV(kv); } delete kvit; @@ -2052,7 +2067,7 @@ bool Table::updateCommittedTable() { SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); while (pairit->hasNext()) { Pair *partId = pairit->next(); - CommitPart *part = parts->get(partId); + CommitPart *part = pairit->currVal(); // Get the transaction object for that sequence number Hashtable *commitForClientTable = liveCommitsTable->get(part->getMachineId()); @@ -2075,6 +2090,7 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); + part->releaseRef(); } delete pairit; delete parts; @@ -2680,8 +2696,10 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) + entry->acquireRef(); TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { + previouslySeenPart->releaseRef(); previouslySeenPart->setDead(); } } @@ -2706,9 +2724,11 @@ void Table::processEntry(CommitPart *entry) { } // Update the part and set dead ones we have already seen (got a // rescued version) + entry->acquireRef(); CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); + previouslySeenPart->releaseRef(); } } diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 258c22b..b0d3c5b 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -30,6 +30,9 @@ Transaction::~Transaction() { if (missingParts) delete missingParts; { + uint Size = parts->size(); + for(uint i=0; iget(i)->releaseRef(); delete parts; } { @@ -54,11 +57,14 @@ Transaction::~Transaction() { } void Transaction::addPartEncode(TransactionPart *newPart) { + newPart->acquireRef(); + printf("Add part %d\n", newPart->getPartNumber()); TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); if (old == NULL) { partCount++; - } else - delete old; + } else { + old->releaseRef(); + } partsPendingSend->add(newPart->getPartNumber()); sequenceNumber = newPart->getSequenceNumber(); @@ -75,7 +81,7 @@ void Transaction::addPartDecode(TransactionPart *newPart) { newPart->setDead(); return; } - + newPart->acquireRef(); sequenceNumber = newPart->getSequenceNumber(); arbitratorId = newPart->getArbitratorId(); transactionId = newPart->getTransactionId(); @@ -88,6 +94,7 @@ void Transaction::addPartDecode(TransactionPart *newPart) { if (previouslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part + previouslySeenPart->releaseRef(); previouslySeenPart->setDead(); } else if (newPart->isLastPart()) { missingParts = new Hashset(); @@ -253,10 +260,6 @@ void Transaction::setDead() { } } -TransactionPart *Transaction::getPart(int index) { - return parts->get(index); -} - void Transaction::decodeTransactionData() { // Calculate the size of the data section int dataSize = 0; diff --git a/version2/src/C/Transaction.h b/version2/src/C/Transaction.h index 57ce43a..a997382 100644 --- a/version2/src/C/Transaction.h +++ b/version2/src/C/Transaction.h @@ -54,7 +54,6 @@ public: bool isComplete(); Pair *getId(); void setDead(); - TransactionPart *getPart(int32_t index); bool evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable); }; #endif diff --git a/version2/src/C/TransactionPart.h b/version2/src/C/TransactionPart.h index c254049..47966e4 100644 --- a/version2/src/C/TransactionPart.h +++ b/version2/src/C/TransactionPart.h @@ -15,12 +15,12 @@ private: int64_t clientLocalSequenceNumber; // Sequence number of the transaction that this is a part of int32_t partNumber; // Parts position in the bool fldisLastPart; - + int32_t refCount; Pair transactionId; Pair partId; Array *data; - + public: TransactionPart(Slot *s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, Array *_data, bool _isLastPart) : Entry(s), sequenceNumber(-1), @@ -29,6 +29,7 @@ public: clientLocalSequenceNumber(_clientLocalSequenceNumber), partNumber(_partNumber), fldisLastPart(_isLastPart), + refCount(1), transactionId(Pair(machineId, clientLocalSequenceNumber)), partId(Pair(clientLocalSequenceNumber, partNumber)), data(_data) { @@ -48,6 +49,8 @@ public: void setSequenceNumber(int64_t _sequenceNumber); void encode(ByteBuffer *bb); char getType(); + void releaseRef() {if ((--refCount)==0) delete this;} + void acquireRef() {refCount++;} Entry *getCopy(Slot *s); }; diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index e1378c9..5ca954b 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -30,16 +30,17 @@ public: fldsize--; } - void remove(type t) { + bool remove(type t) { for (uint i = 0; i < fldsize; i++) { if (array[i] == t) { for (i++; i < fldsize; i++) { array[i - 1] = array[i]; } fldsize--; - break; + return true; } } + return false; } void removeIndex(uint i) { -- 2.34.1