From eeac4c936cc40b33c97ea770bed5c1f02db05d18 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Tue, 6 Mar 2018 11:13:56 -0800 Subject: [PATCH] edits --- version2/src/C/ArbitrationRound.cc | 24 +++++++++++++++++ version2/src/C/CloudComm.cc | 10 +++++++- version2/src/C/CloudComm.h | 3 ++- version2/src/C/Commit.cc | 16 ++++++------ version2/src/C/Commit.h | 6 ++--- version2/src/C/Error.h | 6 ++--- version2/src/C/KeyValue.h | 2 -- version2/src/C/Makefile | 2 +- version2/src/C/NewKey.cc | 2 +- version2/src/C/PendingTransaction.h | 2 +- version2/src/C/Table.cc | 40 ++++++++++++++++++++++++++--- version2/src/C/Table.h | 3 ++- version2/src/C/vector.h | 6 +++++ 13 files changed, 97 insertions(+), 25 deletions(-) diff --git a/version2/src/C/ArbitrationRound.cc b/version2/src/C/ArbitrationRound.cc index 3ddf5cb..f038fa2 100644 --- a/version2/src/C/ArbitrationRound.cc +++ b/version2/src/C/ArbitrationRound.cc @@ -22,10 +22,34 @@ ArbitrationRound::~ArbitrationRound() { delete parts; } +void ArbitrationRound::generateParts() { + if (didGenerateParts) { + return; + } + parts = new Vector(); + SetIterator * abit = abortsBefore->iterator(); + while(abit->hasNext()) + parts->add((Entry *)abit->next()); + delete abit; + if (commit != NULL) { + Vector * cParts = commit->getParts(); + uint cPartsSize = cParts->size(); + for(uint i=0; i < cPartsSize; i++) { + parts->add((Entry *)cParts->get(i)); + } + } +} + Vector *ArbitrationRound::getParts() { return parts; } +void ArbitrationRound::removeParts(Vector *removeParts) { + parts->removeAll(removeParts); + didSendPart = true; +} + + bool ArbitrationRound::isDoneSending() { if ((commit == NULL) && abortsBefore->isEmpty()) { return true; diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index 9055d86..224018b 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -55,12 +55,20 @@ CloudComm::CloudComm(Table *_table, IoTString *_baseurl, IoTString *_password, table(_table), listeningPort(_listeningPort), doEnd(false), - timer(TimingSingleton_getInstance()) { + timer(TimingSingleton_getInstance()), + getslot(new Array("getslot", 7)), + putslot(new Array("putslot", 7)) { if (listeningPort > 0) { pthread_create(&localServerThread, NULL, threadWrapper, this); } } +CloudComm::~CloudComm() { + delete random; + delete getslot; + delete putslot; +} + /** * Generates Key from password. */ diff --git a/version2/src/C/CloudComm.h b/version2/src/C/CloudComm.h index 728025c..daa7989 100644 --- a/version2/src/C/CloudComm.h +++ b/version2/src/C/CloudComm.h @@ -67,7 +67,8 @@ public: * Constructor for actual use. Takes in the url and password. */ CloudComm(Table *_table, IoTString *_baseurl, IoTString *_password, int _listeningPort); - + ~CloudComm(); + /** * Inits all the security stuff */ diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index cb707e2..5ed8c89 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -9,7 +9,7 @@ Commit::Commit() : missingParts(NULL), fldisComplete(false), hasLastPart(false), - keyValueUpdateSet(new Hashset()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(-1), machineId(-1), @@ -23,7 +23,7 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction missingParts(NULL), fldisComplete(true), hasLastPart(false), - keyValueUpdateSet(new Hashset()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(_sequenceNumber), machineId(_machineId), @@ -103,7 +103,7 @@ void Commit::invalidateKey(IoTString *key) { } } -Hashset *Commit::getKeyValueUpdateSet() { +Hashset *Commit::getKeyValueUpdateSet() { return keyValueUpdateSet; } @@ -198,7 +198,7 @@ void Commit::decodeCommitData() { Array *Commit::convertDataToBytes() { // Calculate the size of the data int sizeOfData = sizeof(int32_t); // Number of Update KV's - SetIterator *kvit = keyValueUpdateSet->iterator(); + SetIterator *kvit = keyValueUpdateSet->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); sizeOfData += kv->getSize(); @@ -223,11 +223,11 @@ Array *Commit::convertDataToBytes() { return bbEncode->array(); } -void Commit::setKVsMap(Hashset *newKVs) { +void Commit::setKVsMap(Hashset *newKVs) { keyValueUpdateSet->clear(); keyValueUpdateSet->addAll(newKVs); liveKeys->clear(); - SetIterator *kvit = newKVs->iterator(); + SetIterator *kvit = newKVs->iterator(); while (kvit->hasNext()) { liveKeys->add(kvit->next()->getKey()); } @@ -240,8 +240,8 @@ Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { } else if (newer == NULL) { return older; } - Hashset *kvSet = new Hashset(); - SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); + Hashset *kvSet = new Hashset(); + SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); kvSet->add(kv); diff --git a/version2/src/C/Commit.h b/version2/src/C/Commit.h index 8084eb7..1c48250 100644 --- a/version2/src/C/Commit.h +++ b/version2/src/C/Commit.h @@ -10,14 +10,14 @@ private: Hashset *missingParts; bool fldisComplete; bool hasLastPart; - Hashset *keyValueUpdateSet; + Hashset *keyValueUpdateSet; bool isDead; int64_t sequenceNumber; int64_t machineId; int64_t transactionSequenceNumber; Hashset *liveKeys; Array *convertDataToBytes(); - void setKVsMap(Hashset *newKVs); + void setKVsMap(Hashset *newKVs); public: Commit(); @@ -28,7 +28,7 @@ public: Vector *getParts(); void addKV(KeyValue *kv); void invalidateKey(IoTString *key); - Hashset *getKeyValueUpdateSet(); + Hashset *getKeyValueUpdateSet(); int32_t getNumberOfParts(); int64_t getMachineId() { return machineId; } bool isComplete() { return fldisComplete; } diff --git a/version2/src/C/Error.h b/version2/src/C/Error.h index ff8eb8d..44e2845 100644 --- a/version2/src/C/Error.h +++ b/version2/src/C/Error.h @@ -20,9 +20,9 @@ public: #define ServerException_TypeSalt 3 class ServerException { -public: - ServerException(const char *msg, char _type) : type(_type) {} - char getType(); char type; + public: + ServerException(const char *msg, char _type) : type(_type) {} + char getType() {return type;} }; #endif diff --git a/version2/src/C/KeyValue.h b/version2/src/C/KeyValue.h index 9234b2d..06f514b 100644 --- a/version2/src/C/KeyValue.h +++ b/version2/src/C/KeyValue.h @@ -28,6 +28,4 @@ public: }; KeyValue *KeyValue_decode(ByteBuffer *bb); -unsigned int hashKeyValue(KeyValue *kv); -bool equalsKeyValue(KeyValue *a, KeyValue *b); #endif diff --git a/version2/src/C/Makefile b/version2/src/C/Makefile index 495cc70..11ac1cb 100644 --- a/version2/src/C/Makefile +++ b/version2/src/C/Makefile @@ -10,7 +10,7 @@ HEADERS := $(wildcard *.h) OBJECTS := $(CPP_SOURCES:%.cc=$(OBJ_DIR)/%.o) $(C_SOURCES:%.c=$(OBJ_DIR)/%.o) -CFLAGS := -Wall -O3 -g +CFLAGS := -Wall -O0 -g CFLAGS += -I. LDFLAGS := -ldl -lrt -rdynamic -g SHARED := -shared diff --git a/version2/src/C/NewKey.cc b/version2/src/C/NewKey.cc index 2579839..a302a78 100644 --- a/version2/src/C/NewKey.cc +++ b/version2/src/C/NewKey.cc @@ -12,7 +12,7 @@ NewKey::~NewKey() { delete key; } -Entry *decode(Slot *slot, ByteBuffer *bb) { +Entry *NewKey_decode(Slot *slot, ByteBuffer *bb) { int keylength = bb->getInt(); Array *key = new Array(keylength); bb->get(key); diff --git a/version2/src/C/PendingTransaction.h b/version2/src/C/PendingTransaction.h index 3e72fa3..96f6f94 100644 --- a/version2/src/C/PendingTransaction.h +++ b/version2/src/C/PendingTransaction.h @@ -32,7 +32,7 @@ public: /** * Get the transaction arbitrator */ - int64_t getArbitrator(); + int64_t getArbitrator() {return arbitrator;} /** * Get the key value update set */ diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index dce3a8f..df6099a 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -156,6 +156,40 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : init(); } +Table::~Table() { + delete cloud; + delete random; + delete buffer; + // init data structs + delete committedKeyValueTable; + delete speculatedKeyValueTable; + delete pendingTransactionSpeculatedKeyValueTable; + delete liveNewKeyTable; + delete lastMessageTable; + delete rejectedMessageWatchVectorTable; + delete arbitratorTable; + delete liveAbortTable; + delete newTransactionParts; + delete newCommitParts; + delete lastArbitratedTransactionNumberByArbitratorTable; + delete liveTransactionBySequenceNumberTable; + delete liveTransactionByTransactionIdTable; + delete liveCommitsTable; + delete liveCommitsByKeyTable; + delete lastCommitSeenSequenceNumberByArbitratorTable; + delete rejectedSlotVector; + delete pendingTransactionQueue; + delete pendingSendArbitrationEntriesToDelete; + delete transactionPartsSent; + delete outstandingTransactionStatus; + delete liveAbortsGeneratedByLocal; + delete offlineTransactionsCommittedAndAtServer; + delete localCommunicationTable; + delete lastTransactionSeenFromMachineFromServer; + delete pendingSendArbitrationRounds; + delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; +} + /** * Init all the stuff needed for for table usage */ @@ -2130,7 +2164,7 @@ bool Table::updateCommittedTable() { // have live values for their keys Hashset *commitsToEdit = new Hashset(); { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); @@ -2150,7 +2184,7 @@ bool Table::updateCommittedTable() { // Update which keys in the old commits are still live { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); previousCommit->invalidateKey(kv->getKey()); @@ -2180,7 +2214,7 @@ bool Table::updateCommittedTable() { // Update the committed table of keys and which commit is using which key { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); committedKeyValueTable->put(kv->getKey(), kv); diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index a8a759d..d3d22d7 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -239,7 +239,8 @@ private: public: Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort); Table(CloudComm *_cloud, int64_t _localMachineId); - + ~Table(); + /** * Initialize the table by inserting a table status as the first entry into the table status * also initialize the crypto stuff. diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index 438b236..4c4a7e5 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -71,6 +71,12 @@ public: memcpy(&array[fldsize], v->array, v->fldsize * sizeof(type)); } + void removeAll(Vector *v) { + uint vsize = v->size(); + for(uint i = 0; i < vsize; i++) + remove(v->get(i)); + } + void add(type item) { if (fldsize >= capacity) { uint newcap = capacity << 1; -- 2.34.1