edits
authorbdemsky <bdemsky@uci.edu>
Sun, 21 Jan 2018 08:04:30 +0000 (00:04 -0800)
committerbdemsky <bdemsky@uci.edu>
Sun, 21 Jan 2018 08:04:30 +0000 (00:04 -0800)
version2/src/C/Commit.cc
version2/src/C/Commit.h
version2/src/C/IoTString.h
version2/src/C/KeyValue.h
version2/src/C/PendingTransaction.h
version2/src/C/Table.cc
version2/src/C/Transaction.cc
version2/src/C/hashset.h
version2/src/C/hashtable.h

index 601973286131df8d54936259ba8bc2f1e9418648..fb09a94d9590f5e70e639c748c55ad73cc6c2051 100644 (file)
@@ -1,14 +1,15 @@
 #include "Commit.h"
 #include "CommitPart.h"
 #include "ByteBuffer.h"
-#include "KeyValue.h"
+#include "IoTString.h"
 
 Commit::Commit() :
-       parts(new Hashtable<int32_t, CommitPart *>()),
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
        missingParts(NULL),
        fldisComplete(false),
        hasLastPart(false),
-       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
        isDead(false),
        sequenceNumber(-1),
        machineId(-1),
@@ -17,11 +18,12 @@ Commit::Commit() :
 }
 
 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
-       parts(new Hashtable<int32_t, CommitPart *>()),
+       parts(new Vector<CommitPart *>()),
+       partCount(0),
        missingParts(NULL),
        fldisComplete(true),
        hasLastPart(false),
-       keyValueUpdateSet(new Hashset<KeyValue *>()),
+       keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
        isDead(false),
        sequenceNumber(_sequenceNumber),
        machineId(_machineId),
@@ -36,11 +38,13 @@ void Commit::addPartDecode(CommitPart *newPart) {
                return;
        }
 
-       CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
-
-       if (previoslySeenPart != NULL) {
+       CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+       if(previouslySeenPart == NULL)
+               partCount++;
+       
+       if (previouslySeenPart != NULL) {
                // Set dead the old one since the new one is a rescued version of this part
-               previoslySeenPart->setDead();
+               previouslySeenPart->setDead();
        } else if (newPart->isLastPart()) {
                missingParts = new Hashset<int32_t>();
                hasLastPart = true;
@@ -82,7 +86,7 @@ int64_t Commit::getTransactionSequenceNumber() {
        return transactionSequenceNumber;
 }
 
-Hashtable<int32_t, CommitPart *> *Commit::getParts() {
+Vector<CommitPart *> *Commit::getParts() {
        return parts;
 }
 
@@ -99,21 +103,22 @@ void Commit::invalidateKey(IoTString *key) {
        }
 }
 
-Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
+Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
        return keyValueUpdateSet;
 }
 
 int32_t Commit::getNumberOfParts() {
-       return parts->size();
+       return partCount;
 }
 
 void Commit::setDead() {
        if (!isDead) {
                isDead = true;
                // Make all the parts of this transaction dead
-               for (int32_t partNumber : parts->keySet()) {
+               for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
                        CommitPart *part = parts->get(partNumber);
-                       part->setDead();
+                       if (parts!=NULL)
+                               part->setDead();
                }
        }
 }
@@ -124,6 +129,7 @@ CommitPart *Commit::getPart(int index) {
 
 void Commit::createCommitParts() {
        parts->clear();
+       partCount = 0;
        // Convert to chars
        Array<char> *charData = convertDataToBytes();
 
@@ -144,8 +150,8 @@ void Commit::createCommitParts() {
                Array<char> *partData = new Array<char>(copySize);
                System_arraycopy(charData, currentPosition, partData, 0, copySize);
 
-               CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
-               parts->put(part->getPartNumber(), part);
+               CommitPart* part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+               parts->setExpand(part->getPartNumber(), part);
 
                // Update position, count and remaining
                currentPosition += copySize;
@@ -157,19 +163,22 @@ void Commit::createCommitParts() {
 void Commit::decodeCommitData() {
        // Calculate the size of the data section
        int dataSize = 0;
-       for (int i = 0; i < parts->keySet()->size(); i++) {
+       for (int i = 0; i < parts->size(); i++) {
                CommitPart *tp = parts->get(i);
-               dataSize += tp->getDataSize();
+               if (tp!=NULL)
+                       dataSize += tp->getDataSize();
        }
 
        Array<char> *combinedData = new Array<char>(dataSize);
        int currentPosition = 0;
 
        // Stitch all the data sections together
-       for (int i = 0; i < parts->keySet()->size(); i++) {
+       for (int i = 0; i < parts->size(); i++) {
                CommitPart *tp = parts->get(i);
-               System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
-               currentPosition += tp->getDataSize();
+               if (tp!=NULL) {
+                       System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+                       currentPosition += tp->getDataSize();
+               }
        }
 
        // Decoder Object
@@ -186,14 +195,16 @@ void Commit::decodeCommitData() {
        }
 }
 
-Array<char> *convertDataToBytes() {
-
+Array<char> *Commit::convertDataToBytes() {
        // Calculate the size of the data
        int sizeOfData = sizeof(int32_t);       // Number of Update KV's
-       for (KeyValue *kv : keyValueUpdateSet) {
+       SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> * kvit = keyValueUpdateSet->iterator();
+       while(kvit->hasNext()) {
+               KeyValue * kv = kvit->next();
                sizeOfData += kv->getSize();
        }
-
+       delete kvit;
+       
        // Data handlers and storage
        Array<char> *dataArray = new Array<char>(sizeOfData);
        ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
@@ -202,18 +213,25 @@ Array<char> *convertDataToBytes() {
        bbEncode->putInt(keyValueUpdateSet->size());
 
        // Encode all the updates
-       for (KeyValue *kv : keyValueUpdateSet) {
+       kvit = keyValueUpdateSet->iterator();
+       while(kvit->hasNext()) {
+               KeyValue * kv = kvit->next();
                kv->encode(bbEncode);
        }
-
+       delete kvit;
+       
        return bbEncode->array();
 }
 
-void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
        keyValueUpdateSet->clear();
-       keyValueUpdateSet->addAll(newKVs->values());
+       keyValueUpdateSet->addAll(newKVs);
        liveKeys->clear();
-       liveKeys->addAll(newKVs->keySet());
+       SetIterator<KeyValue*, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+       while(kvit->hasNext()) {
+               liveKeys->add(kvit->next()->getKey());
+       }
+       delete kvit;
 }
 
 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
@@ -222,14 +240,20 @@ Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
        } else if (newer == NULL) {
                return older;
        }
-       Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
-       for (KeyValue *kv : older->getKeyValueUpdateSet()) {
-               kvSet->put(kv->getKey(), kv);
+       Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
+       SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+       while(kvit->hasNext()) {
+               KeyValue* kv=kvit->next();
+               kvSet->add(kv);
        }
-       for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
-               kvSet->put(kv->getKey(), kv);
+       delete kvit;
+       kvit = newer->getKeyValueUpdateSet()->iterator();
+       while(kvit->hasNext()) {
+               KeyValue* kv=kvit->next();
+               kvSet->add(kv);
        }
-
+       delete kvit;
+       
        int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
        if (transactionSequenceNumber == -1) {
                transactionSequenceNumber = older->getTransactionSequenceNumber();
@@ -238,5 +262,6 @@ Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
        Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
        newCommit->setKVsMap(kvSet);
 
+       delete kvSet;
        return newCommit;
 }
index 4de3d22e478db8614da689f70ca89aa449841fc2..8084eb7867df037207cdbd9d8677e46afc166173 100644 (file)
@@ -1,21 +1,23 @@
 #ifndef COMMIT_H
 #define COMMIT_H
 #include "common.h"
+#include "KeyValue.h"
 
 class Commit {
 private:
-       Hashtable<int32_t, CommitPart *> *parts;
+       Vector<CommitPart *> *parts;
+       uint32_t partCount;
        Hashset<int32_t> *missingParts;
        bool fldisComplete;
        bool hasLastPart;
-       Hashset<KeyValue *> *keyValueUpdateSet;
+       Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *keyValueUpdateSet;
        bool isDead;
        int64_t sequenceNumber;
        int64_t machineId;
        int64_t transactionSequenceNumber;
        Hashset<IoTString *> *liveKeys;
        Array<char> *convertDataToBytes();
-       void setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs);
+       void setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs);
 
 public:
        Commit();
@@ -23,10 +25,10 @@ public:
        void addPartDecode(CommitPart *newPart);
        int64_t getSequenceNumber();
        int64_t getTransactionSequenceNumber();
-       Hashtable<int32_t, CommitPart *> *getParts();
+       Vector<CommitPart *> *getParts();
        void addKV(KeyValue *kv);
        void invalidateKey(IoTString *key);
-       Hashset<KeyValue *> *getKeyValueUpdateSet();
+       Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *getKeyValueUpdateSet();
        int32_t getNumberOfParts();
        int64_t getMachineId() { return machineId; }
        bool isComplete() { return fldisComplete; }
index 4500a661bad0958e9f134c26cc620232f68e792d..f72dfa8f0dedd1e3701811476ea418a0afec4e3f 100644 (file)
@@ -9,26 +9,41 @@
  * @version 1.0
  */
 
+inline int hashCharArray(Array<char> * array) {
+       uint len = array->length();
+       int hash=0;
+       for(uint i=0; i <len; i++) {
+               hash = 31 * hash + array->get(i);
+       }
+       return hash;
+}
+
 class IoTString {
 private:
        Array<char> *array;
        IoTString() {}
-
+       int hashvalue;
        /**
         * Builds an IoTString object around the char array.  This
         * constructor makes a copy, so the caller is free to modify the char array.
         */
 
 public:
-       IoTString(Array<char> *_array) : array(new Array<char>(_array)) {}
+ IoTString(Array<char> *_array) :
+       array(new Array<char>(_array)),
+               hashvalue(hashCharArray(array)) {
+       }
 
        IoTString(const char *_array) {
                int32_t len = strlen(_array);
                array = new Array<char>(len);
                strcpy(array->internalArray(), _array);
+               hashvalue=hashCharArray(array);
        }
 
-       IoTString(IoTString *string) : array(new Array<char>(string->array)) {
+ IoTString(IoTString *string) :
+       array(new Array<char>(string->array)),
+               hashvalue(hashCharArray(array)) {
        }
 
        ~IoTString() {
@@ -62,6 +77,7 @@ public:
                return result == 0;
        }
 
+       int hashValue() { return hashvalue;}
        int length() { return array->length(); }
        friend IoTString *IoTString_shallow(Array<char> *_array);
 };
@@ -71,4 +87,12 @@ IoTString *IoTString_shallow(Array<char> *_array) {
        str->array = _array;
        return str;
 }
+
+inline int hashString(IoTString *a) {
+       return a->hashValue();
+}
+
+inline bool StringEquals(IoTString *a, IoTString *b) {
+       return a->equals(b);
+}
 #endif
index 06f514b577e096bc59692b9b406a20c3159419d4..2493bb219e6f15aecebd12e03be9ef9f0ba35a96 100644 (file)
@@ -8,7 +8,7 @@
  * @version 1.0
  */
 
-class KeyValue {/*extends Entry */
+class KeyValue { /*extends Entry */
 private:
        IoTString *key;
        IoTString *value;
@@ -28,4 +28,6 @@ public:
 };
 
 KeyValue *KeyValue_decode(ByteBuffer *bb);
-#endif
+unsigned int hashKeyValue(KeyValue *kv);
+bool equalsKeyValue(KeyValue *a, KeyValue *b);
+#endif 
index c036c869bb35a5bd586b5eb29f45380619db78c7..3e72fa364579bea3a2ee7f7aa453b4bb3bfa2fa2 100644 (file)
@@ -14,6 +14,7 @@ private:
 
 public:
        PendingTransaction(int64_t _machineId);
+       ~PendingTransaction();
        /**
         * Add a new key value to the updates
         *
index 4c0331ec362a8cf3bf0812332ba16b317f3748a7..dcdc99be4159477030ac692aba04206187fb2d8c 100644 (file)
@@ -11,6 +11,9 @@
 #include "Transaction.h"
 #include "LastMessage.h"
 #include "Random.h"
+#include "ByteBuffer.h"
+#include "Abort.h"
+#include "CommitPart.h"
 
 
 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
@@ -153,12 +156,12 @@ void Table::init() {
        lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
        rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
        arbitratorTable = new Hashtable<IoTString *, int64_t>();
-       liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
-       newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
-       newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
+       liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
+       newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
+       newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
        lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
        liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
-       liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
+       liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
        liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
        liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
        lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
@@ -168,7 +171,7 @@ void Table::init() {
        transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
        outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
        liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
-       offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
+       offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals>();
        localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
        lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
        pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
@@ -396,9 +399,12 @@ TransactionStatus *Table::commitTransaction() {
        } catch (ServerException *e) {
 
                Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
-               for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
-                       Transaction *transaction = iter->next();
-
+               uint size = pendingTransactionQueue->size();
+               uint oldindex = 0;
+               for(int iter = 0; iter < size; iter++) {
+                       Transaction *transaction = pendingTransactionQueue->get(iter);
+                       pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
+                       
                        if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
                                // Already contacted this client so ignore all attempts to contact this client
                                // to preserve ordering for arbitrator
@@ -407,20 +413,21 @@ TransactionStatus *Table::commitTransaction() {
 
                        Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
 
-                       if (sendReturn->getFirst()) {
+                       if (sendReturn.getFirst()) {
                                // Failed to contact over local
                                arbitratorTriedAndFailed->add(transaction->getArbitrator());
                        } else {
                                // Successful contact or should not contact
 
-                               if (sendReturn->getSecond()) {
+                               if (sendReturn.getSecond()) {
                                        // did arbitrate
-                                       iter->remove();
+                                       oldindex--;
                                }
                        }
                }
        }
-
+       pendingTransactionQueue->setSize(oldindex);
+       
        updateLiveStateFromLocal();
 
        return transactionStatus;
@@ -447,7 +454,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                fromRetry = true;
                                ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
 
-                               if (sendSlotsReturn->getFirst()) {
+                               if (sendSlotsReturn.getFirst()) {
                                        if (newKey != NULL) {
                                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
                                                        newKey = NULL;
@@ -472,7 +479,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                                }
                                        }
                                } else {
-                                       newSlots = sendSlotsReturn->getThird();
+                                       newSlots = sendSlotsReturn.getThird();
                                        bool isInserted = false;
                                        for (uint si = 0; si < newSlots->length(); si++) {
                                                Slot *s = newSlots->get(si);
@@ -542,9 +549,9 @@ bool Table::sendToServer(NewKey *newKey) {
                                        }
                                }
 
-                               if (sendSlotsReturn->getThird()->length() != 0) {
+                               if (sendSlotsReturn.getThird()->length() != 0) {
                                        // insert into the local block chain
-                                       validateAndUpdate(sendSlotsReturn->getThird(), true);
+                                       validateAndUpdate(sendSlotsReturn.getThird(), true);
                                }
                                // continue;
                        } else {
@@ -650,9 +657,9 @@ bool Table::sendToServer(NewKey *newKey) {
 
                        // Try to fill the slot with data
                        ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
-                       bool needsResize = fillSlotsReturn->getFirst();
-                       int newSize = fillSlotsReturn->getSecond();
-                       bool insertedNewKey = fillSlotsReturn->getThird();
+                       bool needsResize = fillSlotsReturn.getFirst();
+                       int newSize = fillSlotsReturn.getSecond();
+                       bool insertedNewKey = fillSlotsReturn.getThird();
 
                        if (needsResize) {
                                // Reset which transaction to send
@@ -684,7 +691,7 @@ bool Table::sendToServer(NewKey *newKey) {
 
                        ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
 
-                       if (sendSlotsReturn->getFirst()) {
+                       if (sendSlotsReturn.getFirst()) {
 
                                // Did insert into the block chain
 
@@ -740,9 +747,9 @@ bool Table::sendToServer(NewKey *newKey) {
                        pendingSendArbitrationEntriesToDelete->clear();
                        transactionPartsSent->clear();
 
-                       if (sendSlotsReturn->getThird()->length() != 0) {
+                       if (sendSlotsReturn.getThird()->length() != 0) {
                                // insert into the local block chain
-                               validateAndUpdate(sendSlotsReturn->getThird(), true);
+                               validateAndUpdate(sendSlotsReturn.getThird(), true);
                        }
                }
 
@@ -801,7 +808,7 @@ bool Table::updateFromLocal(int64_t machineId) {
        bbEncode->putInt(0);
 
        // Send by local
-       Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
+       Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
        localSequenceNumber++;
 
        if (returnData == NULL) {
@@ -816,10 +823,10 @@ bool Table::updateFromLocal(int64_t machineId) {
        for (int i = 0; i < numberOfEntries; i++) {
                char type = bbDecode->get();
                if (type == TypeAbort) {
-                       Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
+                       Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
                        processEntry(abort);
                } else if (type == TypeCommitPart) {
-                       CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
+                       CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
                        processEntry(commitPart);
                }
        }
@@ -863,7 +870,7 @@ Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
 
 
        // Send by local
-       Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
+       Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
        localSequenceNumber++;
 
        if (returnData == NULL) {
@@ -881,7 +888,7 @@ Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
        for (int i = 0; i < numberOfEntries; i++) {
                char type = bbDecode->get();
                if (type == TypeAbort) {
-                       Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
+                       Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
 
                        if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
                                foundAbort = true;
@@ -889,7 +896,7 @@ Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
 
                        processEntry(abort);
                } else if (type == TypeCommitPart) {
-                       CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
+                       CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
                        processEntry(commitPart);
                }
        }
@@ -938,8 +945,8 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
 
                // Arbitrate on transaction and pull relevant return data
                Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
-               couldArbitrate = localArbitrateReturn->getFirst();
-               didCommit = localArbitrateReturn->getSecond();
+               couldArbitrate = localArbitrateReturn.getFirst();
+               didCommit = localArbitrateReturn.getSecond();
 
                updateLiveStateFromLocal();
 
@@ -981,7 +988,7 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
 
                        unseenArbitrations->addAll(commit->getParts()->values());
 
-                       for (CommitPart commitPart : commit->getParts()->values()) {
+                       for (CommitPart *commitPart : commit->getParts()->values()) {
                                returnDataSize += commitPart->getSize();
                        }
                }
@@ -1013,11 +1020,12 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        }
 
        bbEncode->putInt(unseenArbitrations->size());
-       for (Entry *entry : unseenArbitrations) {
+       uint size = unseenArbitrations->size();
+       for(uint i = 0; i< size; i++) {
+               Entry * entry = unseenArbitrations->get(i);
                entry->encode(bbEncode);
        }
 
-
        localSequenceNumber++;
        return returnData;
 }
@@ -1044,14 +1052,17 @@ ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int
                if (hadPartialSendToServer) {
 
                        bool isInserted = false;
-                       for (Slot *s : array) {
+                       uint size = s->size();
+                       for(uint i=0; i < size; i++) {
+                               Slot *s = array->get(i);
                                if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
                                        isInserted = true;
                                        break;
                                }
                        }
 
-                       for (Slot *s : array) {
+                       for(uint i=0; i < size; i++) {
+                               Slot *s = array->get(i);
                                if (isInserted) {
                                        break;
                                }
@@ -1107,9 +1118,9 @@ ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey
        ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
 
        // Extract working variables
-       bool needsResize = mandatoryRescueReturn->getFirst();
-       bool seenLiveSlot = mandatoryRescueReturn->getSecond();
-       int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
+       bool needsResize = mandatoryRescueReturn.getFirst();
+       bool seenLiveSlot = mandatoryRescueReturn.getSecond();
+       int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
 
        if (needsResize && !resize) {
                // We need to resize but we are not resizing so return false
@@ -1266,14 +1277,11 @@ ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize
                // Iterate over all the live entries and try to rescue them
                for (Entry *liveEntry : liveEntries) {
                        if (slot->hasSpace(liveEntry)) {
-
                                // Enough space to rescue the entry
                                slot->addEntry(liveEntry);
                        } else if (currentSequenceNumber == firstIfFull) {
                                //if there's no space but the entry is about to fall off the queue
-                               System->out->println("B");      //?
                                return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
-
                        }
                }
        }
@@ -1357,7 +1365,7 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                // must have a last message message-> If not then the server is
                // hiding slots
                if (!machineSet->isEmpty()) {
-                       throw new Error("Missing record for machines: " + machineSet);
+                       throw new Error("Missing record for machines: ");
                }
        }
 
@@ -1446,7 +1454,7 @@ void Table::updateExpectedSize() {
  */
 void Table::checkNumSlots(int numberOfSlots) {
        if (numberOfSlots != expectedsize) {
-               throw new Error("Server Error: Server did not send all slots->  Expected: " + expectedsize + " Received:" + numberOfSlots);
+               throw new Error("Server Error: Server did not send all slots->  Expected: ");
        }
 }
 
@@ -2210,7 +2218,7 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo
                        processEntry((TableStatus *)entry, slot->getSequenceNumber());
                        break;
                default:
-                       throw new Error("Unrecognized type: " + entry->getType());
+                       throw new Error("Unrecognized type: ");
                }
        }
 }
@@ -2282,7 +2290,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
                        // a rejected slot
                        int64_t slotMachineId = slot->getMachineID();
                        if (isequal != (slotMachineId == machineId)) {
-                               throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
+                               throw new Error("Server Error: Trying to insert rejected message for slot ");
                        }
                }
        }
@@ -2301,7 +2309,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
                }
 
                Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
-               int64_t entrySequenceNumber = lastMessageValue->getFirst();
+               int64_t entrySequenceNumber = lastMessageValue.getFirst();
 
                if (entrySequenceNumber < seq) {
                        // Add this rejected message to the set of messages that this
@@ -2345,7 +2353,7 @@ void Table::processEntry(Abort *entry) {
                liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
        }
 
-       if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
+       if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
                // The machine already saw this so it is dead
                entry->setDead();
                liveAbortTable->remove(entry->getAbortId());
@@ -2400,7 +2408,7 @@ void Table::processEntry(TransactionPart *entry) {
        }
 
        // This part is still alive
-       Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
+       Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
 
        if (transactionPart == NULL) {
                // Dont have a table for this machine Id yet so make one
@@ -2502,8 +2510,8 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                return;
        }
 
-       int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
-       Liveness *lastEntry = lastMessageEntry->getSecond();
+       int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
+       Liveness *lastEntry = lastMessageEntry.getSecond();
 
        // If it is not our machine Id since we already set ours to dead
        if (machineId != localMachineId) {
@@ -2520,12 +2528,12 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                if (hadPartialSendToServer) {
                        // We were not making any updates and we had a machine mismatch
                        if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
-                               throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
+                               throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
                        }
                } else {
                        // We were not making any updates and we had a machine mismatch
                        if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
-                               throw new Error("Server Error: Mismatch on local machine sequence number, needed: " +  lastMessageSeqNum + " got: " + seqNum);
+                               throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
                        }
                }
        } else {
index 23985449cf349c5ec9fa7d3f2fc5acddeb743fc3..51fcbf6b55a1863f99e72b03bfb68f8f49b4357d 100644 (file)
@@ -210,19 +210,15 @@ Pair<int64_t, int64_t> Transaction::getId() {
 }
 
 void Transaction::setDead() {
-       if (isDead) {
-               // Already dead
-               return;
-       }
-
-       // Set dead
-       isDead = true;
-
-       // Make all the parts of this transaction dead
-       for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
-               TransactionPart *part = parts->get(partNumber);
-               if (part != NULL)
-                       part->setDead();
+       if (!isDead) {
+               // Set dead
+               isDead = true;
+               // Make all the parts of this transaction dead
+               for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
+                       TransactionPart *part = parts->get(partNumber);
+                       if (part != NULL)
+                               part->setDead();
+               }
        }
 }
 
index d37ac3bafd4e56784c5c4f395696e86d533df355..a9af155a15500f8b03d62cda816431a2bbaa89d7 100644 (file)
 #define HASHSET_H
 #include "hashtable.h"
 
-template<typename _Key>
-struct Linknode {
-       _Key key;
-       Linknode<_Key> *prev;
-       Linknode<_Key> *next;
-};
-
 template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key), bool (*equals)(_Key, _Key)>
 class Hashset;
 
 template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
 class SetIterator {
 public:
      SetIterator(Linknode<_Key> *_curr, Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *_set) :
-               curr(_curr),
-               set(_set)
SetIterator(Hashlistnode<_Key, _Key> *_curr, Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *_table) :
+curr(_curr),
+table(_table)
        {
        }
 
@@ -67,33 +60,25 @@ public:
 
        void remove() {
                _Key k = last->key;
-               set->remove(k);
+               table->remove(k);
        }
 
 private:
-       Linknode<_Key> *curr;
-       Linknode<_Key> *last;
-       Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set;
+Hashlistnode<_Key,_Key> *curr;
+Hashlistnode<_Key, _Key> *last;
+Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table;
 };
 
 template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
 class Hashset {
 public:
        Hashset(unsigned int initialcapacity = 16, double factor = 0.5) :
-               table(new Hashtable<_Key, Linknode<_Key> *, _KeyInt, _Shift, hash_function, equals>(initialcapacity, factor)),
-               list(NULL),
-               tail(NULL)
+table(new Hashtable<_Key, _Key, _KeyInt, _Shift, hash_function, equals>(initialcapacity, factor))
        {
        }
 
        /** @brief Hashset destructor */
        ~Hashset() {
-               Linknode<_Key> *tmp = list;
-               while (tmp != NULL) {
-                       Linknode<_Key> *tmpnext = tmp->next;
-                       ourfree(tmp);
-                       tmp = tmpnext;
-               }
                delete table;
        }
 
@@ -107,27 +92,9 @@ public:
        }
 
        void clear() {
-               Linknode<_Key> *tmp = list;
-               while (tmp != NULL) {
-                       Linknode<_Key> *tmpnext = tmp->next;
-                       ourfree(tmp);
-                       tmp = tmpnext;
-               }
-               list = tail = NULL;
                table->clear();
        }
 
-       void resetAndDelete() {
-               Linknode<_Key> *tmp = list;
-               while (tmp != NULL) {
-                       Linknode<_Key> *tmpnext = tmp->next;
-                       ourfree(tmp);
-                       tmp = tmpnext;
-               }
-               list = tail = NULL;
-               table->resetAndDeleteKeys();
-       }
-
        /** @brief Adds a new key to the hashset.  Returns false if the key
         *  is already present. */
 
@@ -142,77 +109,32 @@ public:
         *  is already present. */
 
        bool add(_Key key) {
-               Linknode<_Key> *val = table->get(key);
-               if (val == NULL) {
-                       Linknode<_Key> *newnode = (Linknode<_Key> *)ourmalloc(sizeof(struct Linknode<_Key>));
-                       newnode->prev = tail;
-                       newnode->next = NULL;
-                       newnode->key = key;
-                       if (tail != NULL)
-                               tail->next = newnode;
-                       else
-                               list = newnode;
-                       tail = newnode;
-                       table->put(key, newnode);
+               if (!table->contains(key)) {
+                       table->put(key, key);
                        return true;
                } else
                        return false;
        }
 
-       /** @brief Return random key from set. */
-
-       _Key getRandomElement() {
-               if (size() == 0)
-                       return NULL;
-               else if (size() < 6) {
-                       uint count = random() % size();
-                       Linknode<_Key> *ptr = list;
-                       while (count > 0) {
-                               ptr = ptr->next;
-                               count--;
-                       }
-                       return ptr->key;
-               } else
-                       return table->getRandomValue()->key;
-       }
-
        /** @brief Gets the original key corresponding to this one from the
         *  hashset.  Returns NULL if not present. */
 
        _Key get(_Key key) {
-               Linknode<_Key> *val = table->get(key);
-               if (val != NULL)
-                       return val->key;
-               else
-                       return NULL;
+               return table->get(key);
        }
 
        _Key getFirstKey() {
-               return list->key;
+               return table->list->key;
        }
 
        bool contains(_Key key) {
-               return table->get(key) != NULL;
+               return table->contains(key);
        }
 
        bool remove(_Key key) {
-               Linknode<_Key> *oldlinknode;
-               oldlinknode = table->get(key);
-               if (oldlinknode == NULL) {
+               if (!table->contains(key))
                        return false;
-               }
                table->remove(key);
-
-               //remove link node from the list
-               if (oldlinknode->prev == NULL)
-                       list = oldlinknode->next;
-               else
-                       oldlinknode->prev->next = oldlinknode->next;
-               if (oldlinknode->next != NULL)
-                       oldlinknode->next->prev = oldlinknode->prev;
-               else
-                       tail = oldlinknode->prev;
-               ourfree(oldlinknode);
                return true;
        }
 
@@ -225,7 +147,7 @@ public:
        }
 
        SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *iterator() {
-               return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(list, this);
+               return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table);
        }
 
        /** Override: new operator */
@@ -248,8 +170,11 @@ public:
                ourfree(p);
        }
 private:
-       Hashtable<_Key, Linknode<_Key> *, _KeyInt, _Shift, hash_function, equals> *table;
-       Linknode<_Key> *list;
-       Linknode<_Key> *tail;
+       Hashtable<_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table;
 };
+
+template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key), bool (*equals)(_Key, _Key)>
+       SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> * getKeyIterator(Hashtable<_Key,_Key,_KeyInt,_Shift,hash_function,equals> *table) {
+       return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table);
+}
 #endif
index b610154dc87d111cc123a1cf309f538e25657f73..3ded5b5993f7b9568501f874bcf08fa6ba8cc31f 100644 (file)
@@ -31,6 +31,8 @@ struct Hashlistnode {
        _Key key;
        _Val val;
        uint hashcode;
+       struct Hashlistnode<_Key, _Val> * next;
+       struct Hashlistnode<_Key, _Val> * prev;
 };
 
 template<typename _Key, int _Shift, typename _KeyInt>
@@ -77,6 +79,7 @@ public:
 
                threshold = (unsigned int)(initialcapacity * loadfactor);
                Size = 0;                                                       // Initial number of elements in the hash
+               tail = list = NULL;
        }
 
        /** @brief Hash table destructor */
@@ -114,6 +117,7 @@ public:
                        zero = NULL;
                }
                Size = 0;
+               tail = list = NULL;
        }
 
        /** Doesn't work with zero value */
@@ -143,6 +147,7 @@ public:
                        zero = NULL;
                }
                Size = 0;
+               tail = list = NULL;
        }
 
        void resetAndDeleteVals() {
@@ -163,6 +168,7 @@ public:
                        zero = NULL;
                }
                Size = 0;
+               tail = list = NULL;
        }
 
        void resetAndFreeVals() {
@@ -183,6 +189,7 @@ public:
                        zero = NULL;
                }
                Size = 0;
+               tail = list = NULL;
        }
 
        /**
@@ -196,6 +203,12 @@ public:
                        _Val oldval;
                        if (!zero) {
                                zero = (struct Hashlistnode<_Key, _Val> *)ourmalloc(sizeof(struct Hashlistnode<_Key, _Val>));
+                               zero->next = list;
+                               if (list != NULL)
+                                       list->prev = zero;
+                               else
+                                       tail = zero;
+                               list = zero;
                                Size++;
                                oldval = (_Val) 0;
                        } else
@@ -231,6 +244,11 @@ public:
                search->key = key;
                search->val = val;
                search->hashcode = hashcode;
+               search->next = list;
+               if (list == NULL)
+                       tail = search;
+               else
+                       list->prev = search;
                Size++;
                return (_Val) 0;
        }
@@ -279,12 +297,21 @@ public:
        _Val remove(_Key key) {
                struct Hashlistnode<_Key, _Val> *search;
 
-               /* Hashtable cannot handle 0 as a key */
                if (!key) {
                        if (!zero) {
                                return (_Val)0;
                        } else {
                                _Val v = zero->val;
+                               if (zero -> next != NULL)
+                                       zero -> next -> prev = zero ->prev;
+                               else
+                                       tail = zero -> prev;
+
+                               if (zero -> prev != NULL)
+                                       zero -> prev -> next = zero -> next;
+                               else
+                                       list = zero->next;
+                                       
                                ourfree(zero);
                                zero = NULL;
                                Size--;
@@ -308,6 +335,17 @@ public:
                                        //empty out this bin
                                        search->val = (_Val) 1;
                                        search->key = 0;
+                                       
+                                       if (search -> next != NULL)
+                                               search -> next -> prev = search ->prev;
+                                       else
+                                               tail = search -> prev;
+                                       
+                                       if (search -> prev != NULL)
+                                               search -> prev -> next = search -> next;
+                                       else
+                                               list = search->next;
+                                       
                                        Size--;
                                        return v;
                                }
@@ -368,7 +406,7 @@ public:
                table = newtable;                                                                                       // Update the global hashtable upon resize()
                capacity = newsize;
                capacitymask = newsize - 1;
-
+               list = tail = NULL;
                threshold = (unsigned int)(newsize * loadfactor);
 
                struct Hashlistnode<_Key, _Val> *bin = &oldtable[0];
@@ -388,6 +426,12 @@ public:
                                index++;
                        } while (search->key);
 
+                       if (tail == NULL)
+                               tail = search;
+                       search -> next = list;
+                       if (list != NULL)
+                               list -> prev = search;
+                       list = search;
                        search->hashcode = hashcode;
                        search->key = key;
                        search->val = bin->val;
@@ -399,7 +443,9 @@ public:
        unsigned int getCapacity() {return capacity;}
        struct Hashlistnode<_Key, _Val> *table;
        struct Hashlistnode<_Key, _Val> *zero;
-       unsigned int capacity;
+  struct Hashlistnode<_Key, _Val> * list;
+  struct Hashlistnode<_Key, _Val> * tail;
+  unsigned int capacity;
        unsigned int Size;
 private:
        unsigned int capacitymask;