From 0b9aca2b62c74f68652b170a92271a98d5b96666 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 18 Jan 2018 21:52:38 -0800 Subject: [PATCH] tabbing --- version2/src/C/Abort.cc | 68 +- version2/src/C/Abort.h | 28 +- version2/src/C/ArbitrationRound.cc | 111 +-- version2/src/C/ArbitrationRound.h | 43 +- version2/src/C/ByteBuffer.h | 4 +- version2/src/C/CloudComm.cc | 976 +++++++++++++-------------- version2/src/C/CloudComm.h | 661 ++---------------- version2/src/C/Commit.cc | 312 +++++---- version2/src/C/Commit.h | 42 +- version2/src/C/CommitPart.cc | 234 +++---- version2/src/C/CommitPart.h | 234 +++---- version2/src/C/Entry.cc | 74 +- version2/src/C/Entry.h | 22 +- version2/src/C/IoTString.h | 36 +- version2/src/C/KeyValue.cc | 83 +-- version2/src/C/KeyValue.h | 82 +-- version2/src/C/LastMessage.cc | 8 +- version2/src/C/LastMessage.h | 22 +- version2/src/C/LocalComm.cc | 34 +- version2/src/C/LocalComm.h | 34 +- version2/src/C/Makefile | 6 +- version2/src/C/NewKey.cc | 8 +- version2/src/C/NewKey.h | 28 +- version2/src/C/Pair.h | 18 +- version2/src/C/PendingTransaction.cc | 76 +-- version2/src/C/PendingTransaction.h | 18 +- version2/src/C/RejectedMessage.cc | 18 +- version2/src/C/RejectedMessage.h | 22 +- version2/src/C/Slot.cc | 192 +++--- version2/src/C/Slot.h | 44 +- version2/src/C/SlotBuffer.cc | 14 +- version2/src/C/SlotBuffer.h | 12 +- version2/src/C/SlotIndexer.cc | 4 +- version2/src/C/SlotIndexer.h | 12 +- version2/src/C/Table.cc | 248 +++---- version2/src/C/Table.h | 434 ++++++------ version2/src/C/TableStatus.cc | 6 +- version2/src/C/TableStatus.h | 18 +- version2/src/C/ThreeTuple.h | 12 +- version2/src/C/TimingSingleton.h | 16 +- version2/src/C/Transaction.cc | 588 ++++++++-------- version2/src/C/Transaction.h | 556 +++++++-------- version2/src/C/TransactionPart.cc | 264 ++++---- version2/src/C/TransactionPart.h | 264 ++++---- version2/src/C/TransactionStatus.cc | 94 +-- version2/src/C/TransactionStatus.h | 94 +-- version2/src/C/array.h | 50 +- version2/src/C/hashset.h | 6 +- version2/src/C/hashtable.h | 2 +- version2/src/C/vector.h | 15 +- 50 files changed, 2807 insertions(+), 3440 deletions(-) diff --git a/version2/src/C/Abort.cc b/version2/src/C/Abort.cc index 32dd99e..bd0c6a2 100644 --- a/version2/src/C/Abort.cc +++ b/version2/src/C/Abort.cc @@ -1,44 +1,44 @@ #include "Abort.h" #include "ByteBuffer.h" -Abort::Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) : - Entry(slot), - transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber), - transactionSequenceNumber(_transactionSequenceNumber), - transactionMachineId(_transactionMachineId), - transactionArbitrator(_transactionArbitrator), - arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber), - abortId(new Pair(transactionMachineId, transactionClientLocalSequenceNumber)) { +Abort::Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) : + Entry(slot), + transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber), + transactionSequenceNumber(_transactionSequenceNumber), + transactionMachineId(_transactionMachineId), + transactionArbitrator(_transactionArbitrator), + arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber), + abortId(new Pair(transactionMachineId, transactionClientLocalSequenceNumber)) { } -Abort::Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) : - Entry(slot), - transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber), - transactionSequenceNumber(_transactionSequenceNumber), - sequenceNumber(_sequenceNumber), - transactionMachineId(_transactionMachineId), - transactionArbitrator(_transactionArbitrator), - arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber), - abortId(new Pair(transactionMachineId, transactionClientLocalSequenceNumber)) { +Abort::Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) : + Entry(slot), + transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber), + transactionSequenceNumber(_transactionSequenceNumber), + sequenceNumber(_sequenceNumber), + transactionMachineId(_transactionMachineId), + transactionArbitrator(_transactionArbitrator), + arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber), + abortId(new Pair(transactionMachineId, transactionClientLocalSequenceNumber)) { } -Entry * Abortdecode(Slot * slot, ByteBuffer * bb) { - int64_t transactionClientLocalSequenceNumber = bb->getLong(); - int64_t transactionSequenceNumber = bb->getLong(); - int64_t sequenceNumber = bb->getLong(); - int64_t transactionMachineId = bb->getLong(); - int64_t transactionArbitrator = bb->getLong(); - int64_t arbitratorLocalSequenceNumber = bb->getLong(); - - return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); +Entry *Abortdecode(Slot *slot, ByteBuffer *bb) { + int64_t transactionClientLocalSequenceNumber = bb->getLong(); + int64_t transactionSequenceNumber = bb->getLong(); + int64_t sequenceNumber = bb->getLong(); + int64_t transactionMachineId = bb->getLong(); + int64_t transactionArbitrator = bb->getLong(); + int64_t arbitratorLocalSequenceNumber = bb->getLong(); + + return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); } -void Abort::encode(ByteBuffer * bb) { - bb->put(TypeAbort); - bb->putLong(transactionClientLocalSequenceNumber); - bb->putLong(transactionSequenceNumber); - bb->putLong(sequenceNumber); - bb->putLong(transactionMachineId); - bb->putLong(transactionArbitrator); - bb->putLong(arbitratorLocalSequenceNumber); +void Abort::encode(ByteBuffer *bb) { + bb->put(TypeAbort); + bb->putLong(transactionClientLocalSequenceNumber); + bb->putLong(transactionSequenceNumber); + bb->putLong(sequenceNumber); + bb->putLong(transactionMachineId); + bb->putLong(transactionArbitrator); + bb->putLong(arbitratorLocalSequenceNumber); } diff --git a/version2/src/C/Abort.h b/version2/src/C/Abort.h index d03d845..ad54c4f 100644 --- a/version2/src/C/Abort.h +++ b/version2/src/C/Abort.h @@ -5,35 +5,35 @@ #include "Pair.h" class Abort : public Entry { - private: +private: int64_t transactionClientLocalSequenceNumber; - int64_t transactionSequenceNumber; + int64_t transactionSequenceNumber; int64_t sequenceNumber; int64_t transactionMachineId; int64_t transactionArbitrator; int64_t arbitratorLocalSequenceNumber; - Pair * abortId; - - public: - Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber); - Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber); + Pair *abortId; + +public: + Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber); + Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber); + + Pair *getAbortId() {return abortId;} - Pair * getAbortId() {return abortId;} - int64_t getTransactionMachineId() { return transactionMachineId; } int64_t getTransactionSequenceNumber() { return transactionSequenceNumber; } int64_t getTransactionClientLocalSequenceNumber() { return transactionClientLocalSequenceNumber; } int64_t getArbitratorLocalSequenceNumber() { return arbitratorLocalSequenceNumber; } - void setSlot(Slot * s) { parentslot = s; } - int64_t getSequenceNumber() { return sequenceNumber; } + void setSlot(Slot *s) { parentslot = s; } + int64_t getSequenceNumber() { return sequenceNumber; } void setSequenceNumber(int64_t _sequenceNumber) { sequenceNumber = _sequenceNumber; } int64_t getTransactionArbitrator() { return transactionArbitrator; } - void encode(ByteBuffer * bb); + void encode(ByteBuffer *bb); int getSize() { return (6 * sizeof(uint64_t)) + sizeof(char); } char getType() { return TypeAbort; } - Entry * getCopy(Slot * s) { return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); } + Entry *getCopy(Slot *s) { return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); } }; -Entry * Abortdecode(Slot * slot, ByteBuffer * bb); +Entry *Abortdecode(Slot *slot, ByteBuffer *bb); #endif diff --git a/version2/src/C/ArbitrationRound.cc b/version2/src/C/ArbitrationRound.cc index 2e2105d..da21dda 100644 --- a/version2/src/C/ArbitrationRound.cc +++ b/version2/src/C/ArbitrationRound.cc @@ -1,91 +1,94 @@ #include "ArbitrationRound.h" #include "Commit.h" -ArbitrationRound::ArbitrationRound(Commit * _commit, Hashset * _abortsBefore) : +ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset *_abortsBefore) : abortsBefore(_abortsBefore), - parts(new Vector()), - commit(_commit), + parts(new Vector()), + commit(_commit), currentSize(0), didSendPart(false), didGenerateParts(false) { - - if (commit != NULL) { - commit->createCommitParts(); - currentSize += commit->getNumberOfParts(); + + if (commit != NULL) { + commit->createCommitParts(); + currentSize += commit->getNumberOfParts(); } - - currentSize += abortsBefore->size(); + + currentSize += abortsBefore->size(); } -void ArbitrationRound::generateParts() { - if (didGenerateParts) { +/* + void ArbitrationRound::generateParts() { + if (didGenerateParts) { return; - } - parts = new Vector(abortsBefore); - if (commit != NULL) { + } + parts = new Vector((Vector *)abortsBefore); + if (commit != NULL) { parts->addAll(commit->getParts()->values()); - } -} + } + }*/ -Vector * ArbitrationRound::getParts() { - return parts; +Vector *ArbitrationRound::getParts() { + return parts; } -void ArbitrationRound::removeParts(Vector * removeParts) { - parts->removeAll(removeParts); - didSendPart = true; -} +/* + void ArbitrationRound::removeParts(Vector * removeParts) { + parts->removeAll(removeParts); + didSendPart = true; + } + */ bool ArbitrationRound::isDoneSending() { - if ((commit == NULL) && abortsBefore->isEmpty()) { - return true; - } - - return parts->isEmpty(); + if ((commit == NULL) && abortsBefore->isEmpty()) { + return true; + } + + return parts->isEmpty(); } -Commit * ArbitrationRound::getCommit() { - return commit; +Commit *ArbitrationRound::getCommit() { + return commit; } - -void ArbitrationRound::setCommit(Commit * _commit) { - if (commit != NULL) { - currentSize -= commit->getNumberOfParts(); - } - commit = _commit; - - if (commit != NULL) { - currentSize += commit->getNumberOfParts(); - } + +void ArbitrationRound::setCommit(Commit *_commit) { + if (commit != NULL) { + currentSize -= commit->getNumberOfParts(); + } + commit = _commit; + + if (commit != NULL) { + currentSize += commit->getNumberOfParts(); + } } -void ArbitrationRound::addAbort(Abort * abort) { - abortsBefore->add(abort); - currentSize++; +void ArbitrationRound::addAbort(Abort *abort) { + abortsBefore->add(abort); + currentSize++; } - -void ArbitrationRound::addAborts(Hashset * aborts) { - abortsBefore->addAll(aborts); - currentSize += aborts->size(); + +void ArbitrationRound::addAborts(Hashset *aborts) { + abortsBefore->addAll(aborts); + currentSize += aborts->size(); } - -Hashset * ArbitrationRound::getAborts() { - return abortsBefore; + +Hashset *ArbitrationRound::getAborts() { + return abortsBefore; } int ArbitrationRound::getAbortsCount() { - return abortsBefore->size(); + return abortsBefore->size(); } int ArbitrationRound::getCurrentSize() { - return currentSize; + return currentSize; } bool ArbitrationRound::isFull() { - return currentSize >= MAX_PARTS; + return currentSize >= MAX_PARTS; } - + bool ArbitrationRound::getDidSendPart() { - return didSendPart; + return didSendPart; } diff --git a/version2/src/C/ArbitrationRound.h b/version2/src/C/ArbitrationRound.h index da1a2a8..b91469d 100644 --- a/version2/src/C/ArbitrationRound.h +++ b/version2/src/C/ArbitrationRound.h @@ -5,29 +5,30 @@ #include "common.h" class ArbitrationRound { - private: - Hashset * abortsBefore; - Vector * parts; - Commit * commit; - int currentSize; - bool didSendPart; - bool didGenerateParts; +private: + Hashset *abortsBefore; + Vector *parts; + Commit *commit; + int currentSize; + bool didSendPart; + bool didGenerateParts; - public: - ArbitrationRound(Commit * _commit, Hashset * _abortsBefore); +public: + ArbitrationRound(Commit *_commit, Hashset *_abortsBefore); ~ArbitrationRound(); - void generateParts(); - Vector * getParts(); - void removeParts(Vector * removeParts); - bool isDoneSending(); - void setCommit(Commit * _commit); - void addAbort(Abort * abort); - void addAborts(Hashset * aborts); - Hashset * getAborts(); - int getAbortsCount(); - int getCurrentSize(); - bool isFull(); - bool getDidSendPart(); + void generateParts(); + Commit *getCommit(); + Vector *getParts(); + void removeParts(Vector *removeParts); + bool isDoneSending(); + void setCommit(Commit *_commit); + void addAbort(Abort *abort); + void addAborts(Hashset *aborts); + Hashset *getAborts(); + int getAbortsCount(); + int getCurrentSize(); + bool isFull(); + bool getDidSendPart(); }; #endif diff --git a/version2/src/C/ByteBuffer.h b/version2/src/C/ByteBuffer.h index 2c87e44..ec98caf 100644 --- a/version2/src/C/ByteBuffer.h +++ b/version2/src/C/ByteBuffer.h @@ -3,10 +3,10 @@ #include "common.h" class ByteBuffer { - public: +public: void put(char c); void putLong(int64_t l); int64_t getLong(); - private: +private: }; #endif diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index 1a60c7e..c5da91a 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -1,655 +1,631 @@ - - - +#include "CloudComm.h" /** - * This class provides a communication API to the webserver. It also - * validates the HMACs on the slots and handles encryption. - * @author Brian Demsky - * @version 1.0 + * Empty Constructor needed for child class. */ +CloudComm::CloudComm() : + baseurl(NULL), + key(NULL), + mac(NULL), + password(NULL), + random(NULL), + salt(NULL), + table(NULL), + listeningPort(-1), + localServerThread(NULL), + doEnd(false) + timer(TimingSingleton.getInstance()) +{ +} - -class CloudComm { - static final int SALT_SIZE = 8; - static final int TIMEOUT_MILLIS = 5000; // 100 - static final int IV_SIZE = 16; - - /** Sets the size for the HMAC. */ - static final int HMAC_SIZE = 32; - - String baseurl; - SecretKeySpec key; - Mac mac; - String password; - SecureRandom random; - char salt[]; - Table table; - int listeningPort = -1; - Thread localServerThread = NULL; - bool doEnd = false; - - TimingSingleton timer = NULL; - - /** - * Empty Constructor needed for child class. - */ - CloudComm() { - timer = TimingSingleton.getInstance(); - } - - /** - * Constructor for actual use. Takes in the url and password. - */ - CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) { - timer = TimingSingleton.getInstance(); - this.table = _table; - this.baseurl = _baseurl; - this.password = _password; - this.random = new SecureRandom(); - this.listeningPort = _listeningPort; - - if (this.listeningPort > 0) { - localServerThread = new Thread(new Runnable() { - void run() { - localServerWorkerFunction(); - } - }); - localServerThread.start(); - } +/** + * Constructor for actual use. Takes in the url and password. + */ +CloudComm::CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) : + baseurl(_baseurl), + key(NULL), + mac(NULL), + password(_password), + random(new SecureRandom()), + salt(NULL), + table(_table), + listeningPort(_listeningPort), + localServerThread(NULL), + doEnd(false) + timer(TimingSingleton.getInstance()) { + if (this.listeningPort > 0) { + localServerThread = new Thread(new Runnable() { + void run() { + localServerWorkerFunction(); + } + }); + localServerThread.start(); } +} - /** - * Generates Key from password. - */ - SecretKeySpec initKey() { - try { - PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), - salt, - 65536, - 128); - SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec); - return new SecretKeySpec(tmpkey.getEncoded(), "AES"); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed generating key."); - } +/** + * Generates Key from password. + */ +SecretKeySpec *CloudComm::initKey() { + try { + PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), + salt, + 65536, + 128); + SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec); + return new SecretKeySpec(tmpkey.getEncoded(), "AES"); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed generating key."); } +} - /** - * Inits all the security stuff - */ - void initSecurity() throws ServerException { - // try to get the salt and if one does not exist set one - if (!getSalt()) { - //Set the salt - setSalt(); - } +/** + * Inits all the security stuff + */ - initCrypt(); +void CloudComm::initSecurity() { + // try to get the salt and if one does not exist set one + if (!getSalt()) { + //Set the salt + setSalt(); } - /** - * Inits the HMAC generator. - */ - void initCrypt() { + initCrypt(); +} - if (password == NULL) { - return; - } +/** + * Inits the HMAC generator. + */ +void CloudComm::initCrypt() { - try { - key = initKey(); - password = NULL; // drop password - mac = Mac.getInstance("HmacSHA256"); - mac.init(key); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed To Initialize Ciphers"); - } + if (password == NULL) { + return; } - /* - * Builds the URL for the given request. - */ - URL buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) throws IOException { - String reqstring = isput ? "req=putslot" : "req=getslot"; - String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber; - if (maxentries != 0) - urlstr += "&max=" + maxentries; - return new URL(urlstr); + try { + key = initKey(); + password = NULL;// drop password + mac = Mac.getInstance("HmacSHA256"); + mac.init(key); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed To Initialize Ciphers"); } +} - void setSalt() throws ServerException { - - if (salt != NULL) { - // Salt already sent to server so dont set it again - return; - } - - try { - char[] saltTmp = new char[SALT_SIZE]; - random.nextBytes(saltTmp); - - for (int i = 0; i < SALT_SIZE; i++) { - System.out.println((int)saltTmp[i] & 255); - } +/* + * Builds the URL for the given request. + */ +URL *CloudComm::buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) { + IoTString *reqstring = isput ? "req=putslot" : "req=getslot"; + IoTString *urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber; + if (maxentries != 0) + urlstr += "&max=" + maxentries; + return new URL(urlstr); +} +void CloudComm::setSalt() { - URL url = new URL(baseurl + "?req=setsalt"); + if (salt != NULL) { + // Salt already sent to server so dont set it again + return; + } - timer.startTime(); - URLConnection con = url.openConnection(); - HttpURLConnection http = (HttpURLConnection) con; + try { + char[] saltTmp = new char[SALT_SIZE]; + random.nextBytes(saltTmp); - http.setRequestMethod("POST"); - http.setFixedLengthStreamingMode(saltTmp.length); - http.setDoOutput(true); - http.setConnectTimeout(TIMEOUT_MILLIS); + for (int i = 0; i < SALT_SIZE; i++) { + System.out.println((int)saltTmp[i] & 255); + } - http.connect(); + URL url = new URL(baseurl + "?req=setsalt"); - OutputStream os = http.getOutputStream(); - os.write(saltTmp); - os.flush(); + timer.startTime(); + URLConnection con = url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; - int responsecode = http.getResponseCode(); - if (responsecode != HttpURLConnection.HTTP_OK) { - // TODO: Remove this print - System.out.println(responsecode); - throw new Error("Invalid response"); - } + http.setRequestMethod("POST"); + http.setFixedLengthStreamingMode(saltTmp.length); + http.setDoOutput(true); + http.setConnectTimeout(TIMEOUT_MILLIS); - timer.endTime(); - salt = saltTmp; - } catch (Exception e) { - // e.printStackTrace(); - timer.endTime(); - throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout); - } - } + http.connect(); - bool getSalt() throws ServerException { - URL url = NULL; - URLConnection con = NULL; - HttpURLConnection http = NULL; + OutputStream os = http.getOutputStream(); + os.write(saltTmp); + os.flush(); - try { - url = new URL(baseurl + "?req=getsalt"); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlot failed"); + int responsecode = http.getResponseCode(); + if (responsecode != HttpURLConnection.HTTP_OK) { + // TODO: Remove this print + System.out.println(responsecode); + throw new Error("Invalid response"); } - try { - timer.startTime(); - con = url.openConnection(); - http = (HttpURLConnection) con; - http.setRequestMethod("POST"); - http.setConnectTimeout(TIMEOUT_MILLIS); - http.setReadTimeout(TIMEOUT_MILLIS); + timer.endTime(); + salt = saltTmp; + } catch (Exception e) { + // e.printStackTrace(); + timer.endTime(); + throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout); + } +} - http.connect(); - timer.endTime(); - } catch (SocketTimeoutException e) { - timer.endTime(); - throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlot failed"); - } +bool CloudComm::getSalt() { + URL *url = NULL; + URLConnection *con = NULL; + HttpURLConnection *http = NULL; - try { + try { + url = new URL(baseurl + "?req=getsalt"); + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("getSlot failed"); + } + try { + + timer.startTime(); + con = url.openConnection(); + http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.setConnectTimeout(TIMEOUT_MILLIS); + http.setReadTimeout(TIMEOUT_MILLIS); + + + http.connect(); + timer.endTime(); + } catch (SocketTimeoutException e) { + timer.endTime(); + throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout); + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("getSlot failed"); + } - timer.startTime(); + try { - int responsecode = http.getResponseCode(); - if (responsecode != HttpURLConnection.HTTP_OK) { - // TODO: Remove this print - // System.out.println(responsecode); - throw new Error("Invalid response"); - } + timer.startTime(); - InputStream is = http.getInputStream(); - if (is.available() > 0) { - DataInputStream dis = new DataInputStream(is); - int salt_length = dis.readInt(); - char [] tmp = new char[salt_length]; - dis.readFully(tmp); - salt = tmp; - timer.endTime(); + int responsecode = http.getResponseCode(); + if (responsecode != HttpURLConnection.HTTP_OK) { + // TODO: Remove this print + // System.out.println(responsecode); + throw new Error("Invalid response"); + } - return true; - } else { - timer.endTime(); + InputStream is = http.getInputStream(); + if (is.available() > 0) { + DataInputStream dis = new DataInputStream(is); + int salt_length = dis.readInt(); + char [] tmp = new char[salt_length]; + dis.readFully(tmp); + salt = tmp; + timer.endTime(); - return false; - } - } catch (SocketTimeoutException e) { + return true; + } else { timer.endTime(); - throw new ServerException("getSalt failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlot failed"); + return false; } - } - - char[] createIV(int64_t machineId, int64_t localSequenceNumber) { - ByteBuffer buffer = ByteBuffer.allocate(IV_SIZE); - buffer.putLong(machineId); - int64_t localSequenceNumberShifted = localSequenceNumber << 16; - buffer.putLong(localSequenceNumberShifted); - return buffer.array(); + } catch (SocketTimeoutException e) { + timer.endTime(); + throw new ServerException("getSalt failed", ServerException.TypeInputTimeout); + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("getSlot failed"); } +} - char[] encryptSlotAndPrependIV(char[] rawData, char[] ivBytes) { - try { - IvParameterSpec ivSpec = new IvParameterSpec(ivBytes); - Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); - cipher.init(Cipher.ENCRYPT_MODE, key, ivSpec); +Array *CloudComm::createIV(int64_t machineId, int64_t localSequenceNumber) { + ByteBuffer buffer = ByteBuffer.allocate(IV_SIZE); + buffer.putLong(machineId); + int64_t localSequenceNumberShifted = localSequenceNumber << 16; + buffer.putLong(localSequenceNumberShifted); + return buffer.array(); +} - char[] encryptedBytes = cipher.doFinal(rawData); +Array *CloudComm::encryptSlotAndPrependIV(Array *rawData, Array *ivBytes) { + try { + IvParameterSpec ivSpec = new IvParameterSpec(ivBytes); + Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); + cipher.init(Cipher.ENCRYPT_MODE, key, ivSpec); - char[] chars = new char[encryptedBytes.length + IV_SIZE]; - System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length); - System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length); + char[] encryptedBytes = cipher.doFinal(rawData); - return chars; + char[] chars = new char[encryptedBytes.length + IV_SIZE]; + System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length); + System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed To Encrypt"); - } + return chars; + + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed To Encrypt"); } +} - char[] stripIVAndDecryptSlot(char[] rawData) { - try { - char[] ivBytes = new char[IV_SIZE]; - char[] encryptedBytes = new char[rawData.length - IV_SIZE]; - System.arraycopy(rawData, 0, ivBytes, 0, IV_SIZE); - System.arraycopy(rawData, IV_SIZE, encryptedBytes, 0 , encryptedBytes.length); +Array *CloudComm::stripIVAndDecryptSlot(Array *rawData) { + try { + Array *ivBytes = new char[IV_SIZE]; + Array *encryptedBytes = new char[rawData.length - IV_SIZE]; + System.arraycopy(rawData, 0, ivBytes, 0, IV_SIZE); + System.arraycopy(rawData, IV_SIZE, encryptedBytes, 0, encryptedBytes.length); - IvParameterSpec ivSpec = new IvParameterSpec(ivBytes); + IvParameterSpec ivSpec = new IvParameterSpec(ivBytes); - Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); - cipher.init(Cipher.DECRYPT_MODE, key, ivSpec); - return cipher.doFinal(encryptedBytes); + Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); + cipher.init(Cipher.DECRYPT_MODE, key, ivSpec); + return cipher.doFinal(encryptedBytes); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed To Decrypt"); - } + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Failed To Decrypt"); } +} - /* - * API for putting a slot into the queue. Returns NULL on success. - * On failure, the server will send slots with newer sequence - * numbers. - */ - Slot[] putSlot(Slot slot, int max) throws ServerException { - URL url = NULL; - URLConnection con = NULL; - HttpURLConnection http = NULL; +/* + * API for putting a slot into the queue. Returns NULL on success. + * On failure, the server will send slots with newer sequence + * numbers. + */ +Array *CloudComm::putSlot(Slot *slot, int max) { + URL url = NULL; + URLConnection con = NULL; + HttpURLConnection http = NULL; - try { - if (salt == NULL) { - if (!getSalt()) { - throw new ServerException("putSlot failed", ServerException.TypeSalt); - } - initCrypt(); + try { + if (salt == NULL) { + if (!getSalt()) { + throw new ServerException("putSlot failed", ServerException.TypeSalt); } + initCrypt(); + } - int64_t sequencenumber = slot.getSequenceNumber(); - char[] slotBytes = slot.encode(mac); - // slotBytes = encryptCipher.doFinal(slotBytes); + int64_t sequencenumber = slot.getSequenceNumber(); + char[] slotBytes = slot.encode(mac); + // slotBytes = encryptCipher.doFinal(slotBytes); - // char[] iVBytes = slot.getSlotCryptIV(); + // char[] iVBytes = slot.getSlotCryptIV(); - // char[] chars = new char[slotBytes.length + IV_SIZE]; - // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length); - // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length); + // char[] chars = new char[slotBytes.length + IV_SIZE]; + // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length); + // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length); - char[] chars = encryptSlotAndPrependIV(slotBytes, slot.getSlotCryptIV()); + char[] chars = encryptSlotAndPrependIV(slotBytes, slot.getSlotCryptIV()); - url = buildRequest(true, sequencenumber, max); + url = buildRequest(true, sequencenumber, max); - timer.startTime(); - con = url.openConnection(); - http = (HttpURLConnection) con; + timer.startTime(); + con = url.openConnection(); + http = (HttpURLConnection) con; - http.setRequestMethod("POST"); - http.setFixedLengthStreamingMode(chars.length); - http.setDoOutput(true); - http.setConnectTimeout(TIMEOUT_MILLIS); - http.setReadTimeout(TIMEOUT_MILLIS); - http.connect(); + http.setRequestMethod("POST"); + http.setFixedLengthStreamingMode(chars.length); + http.setDoOutput(true); + http.setConnectTimeout(TIMEOUT_MILLIS); + http.setReadTimeout(TIMEOUT_MILLIS); + http.connect(); - OutputStream os = http.getOutputStream(); - os.write(chars); - os.flush(); + OutputStream os = http.getOutputStream(); + os.write(chars); + os.flush(); - timer.endTime(); + timer.endTime(); - // System.out.println("Bytes Sent: " + chars.length); - } catch (ServerException e) { - timer.endTime(); + // System.out.println("Bytes Sent: " + chars.length); + } catch (ServerException e) { + timer.endTime(); - throw e; - } catch (SocketTimeoutException e) { - timer.endTime(); - - throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("putSlot failed"); - } + throw e; + } catch (SocketTimeoutException e) { + timer.endTime(); + throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout); + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("putSlot failed"); + } - try { - timer.startTime(); - InputStream is = http.getInputStream(); - DataInputStream dis = new DataInputStream(is); - char[] resptype = new char[7]; - dis.readFully(resptype); - timer.endTime(); - if (Arrays.equals(resptype, "getslot".getBytes())) { - return processSlots(dis); - } else if (Arrays.equals(resptype, "putslot".getBytes())) { - return NULL; - } else - throw new Error("Bad response to putslot"); + try { + timer.startTime(); + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + char[] resptype = new char[7]; + dis.readFully(resptype); + timer.endTime(); - } catch (SocketTimeoutException e) { - timer.endTime(); - throw new ServerException("putSlot failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("putSlot failed"); - } + if (Arrays.equals(resptype, "getslot".getBytes())) { + return processSlots(dis); + } else if (Arrays.equals(resptype, "putslot".getBytes())) { + return NULL; + } else + throw new Error("Bad response to putslot"); + + } catch (SocketTimeoutException e) { + timer.endTime(); + throw new ServerException("putSlot failed", ServerException.TypeInputTimeout); + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("putSlot failed"); } +} - /** - * Request the server to send all slots with the given - * sequencenumber or newer. - */ - Slot[] getSlots(int64_t sequencenumber) throws ServerException { - URL url = NULL; - URLConnection con = NULL; - HttpURLConnection http = NULL; +/** + * Request the server to send all slots with the given + * sequencenumber or newer. + */ +Array *CloudComm::getSlots(int64_t sequencenumber) { + URL url = NULL; + URLConnection con = NULL; + HttpURLConnection http = NULL; - try { - if (salt == NULL) { - if (!getSalt()) { - throw new ServerException("getSlots failed", ServerException.TypeSalt); - } - initCrypt(); + try { + if (salt == NULL) { + if (!getSalt()) { + throw new ServerException("getSlots failed", ServerException.TypeSalt); } + initCrypt(); + } - url = buildRequest(false, sequencenumber, 0); - timer.startTime(); - con = url.openConnection(); - http = (HttpURLConnection) con; - http.setRequestMethod("POST"); - http.setConnectTimeout(TIMEOUT_MILLIS); - http.setReadTimeout(TIMEOUT_MILLIS); + url = buildRequest(false, sequencenumber, 0); + timer.startTime(); + con = url.openConnection(); + http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.setConnectTimeout(TIMEOUT_MILLIS); + http.setReadTimeout(TIMEOUT_MILLIS); - http.connect(); - timer.endTime(); + http.connect(); + timer.endTime(); - } catch (SocketTimeoutException e) { - timer.endTime(); + } catch (SocketTimeoutException e) { + timer.endTime(); - throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout); - } catch (ServerException e) { - timer.endTime(); + throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout); + } catch (ServerException e) { + timer.endTime(); - throw e; - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlots failed"); - } + throw e; + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("getSlots failed"); + } - try { + try { - timer.startTime(); - InputStream is = http.getInputStream(); - DataInputStream dis = new DataInputStream(is); - char[] resptype = new char[7]; + timer.startTime(); + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + char[] resptype = new char[7]; - dis.readFully(resptype); - timer.endTime(); + dis.readFully(resptype); + timer.endTime(); - if (!Arrays.equals(resptype, "getslot".getBytes())) - throw new Error("Bad Response: " + new String(resptype)); + if (!Arrays.equals(resptype, "getslot".getBytes())) + throw new Error("Bad Response: " + new String(resptype)); - return processSlots(dis); - } catch (SocketTimeoutException e) { - timer.endTime(); + return processSlots(dis); + } catch (SocketTimeoutException e) { + timer.endTime(); - throw new ServerException("getSlots failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlots failed"); - } + throw new ServerException("getSlots failed", ServerException.TypeInputTimeout); + } catch (Exception e) { + // e.printStackTrace(); + throw new Error("getSlots failed"); } +} - /** - * Method that actually handles building Slot objects from the - * server response. Shared by both putSlot and getSlots. - */ - Slot[] processSlots(DataInputStream dis) throws Exception { - int numberofslots = dis.readInt(); - int[] sizesofslots = new int[numberofslots]; +/** + * Method that actually handles building Slot objects from the + * server response. Shared by both putSlot and getSlots. + */ +Array *CloudComm::processSlots(DataInputStream dis) { + int numberofslots = dis.readInt(); + int[] sizesofslots = new int[numberofslots]; - Slot[] slots = new Slot[numberofslots]; - for (int i = 0; i < numberofslots; i++) - sizesofslots[i] = dis.readInt(); + Slot[] slots = new Slot[numberofslots]; + for (int i = 0; i < numberofslots; i++) + sizesofslots[i] = dis.readInt(); - for (int i = 0; i < numberofslots; i++) { + for (int i = 0; i < numberofslots; i++) { - char[] rawData = new char[sizesofslots[i]]; - dis.readFully(rawData); + char[] rawData = new char[sizesofslots[i]]; + dis.readFully(rawData); - // char[] data = new char[rawData.length - IV_SIZE]; - // System.arraycopy(rawData, IV_SIZE, data, 0, data.length); + // char[] data = new char[rawData.length - IV_SIZE]; + // System.arraycopy(rawData, IV_SIZE, data, 0, data.length); - char[] data = stripIVAndDecryptSlot(rawData); + char[] data = stripIVAndDecryptSlot(rawData); - // data = decryptCipher.doFinal(data); + // data = decryptCipher.doFinal(data); - slots[i] = Slot.decode(table, data, mac); - } - dis.close(); - return slots; + slots[i] = Slot.decode(table, data, mac); } + dis.close(); + return slots; +} - char[] sendLocalData(char[] sendData, int64_t localSequenceNumber, String host, int port) { +Array *sendLocalData(Array *sendData, int64_t localSequenceNumber, String host, int port) { - if (salt == NULL) { - return NULL; - } - try { - System.out.println("Passing Locally"); + if (salt == NULL) { + return NULL; + } + try { + System.out.println("Passing Locally"); - mac.update(sendData); - char[] genmac = mac.doFinal(); - char[] totalData = new char[sendData.length + genmac.length]; - System.arraycopy(sendData, 0, totalData, 0, sendData.length); - System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length); + mac.update(sendData); + char[] genmac = mac.doFinal(); + char[] totalData = new char[sendData.length + genmac.length]; + System.arraycopy(sendData, 0, totalData, 0, sendData.length); + System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length); - // Encrypt the data for sending - // char[] encryptedData = encryptCipher.doFinal(totalData); - // char[] encryptedData = encryptCipher.doFinal(totalData); - char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber()); - char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); + // Encrypt the data for sending + // char[] encryptedData = encryptCipher.doFinal(totalData); + // char[] encryptedData = encryptCipher.doFinal(totalData); + char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber()); + char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); - // Open a TCP socket connection to a local device - Socket socket = new Socket(host, port); - socket.setReuseAddress(true); - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - DataInputStream input = new DataInputStream(socket.getInputStream()); + // Open a TCP socket connection to a local device + Socket socket = new Socket(host, port); + socket.setReuseAddress(true); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + DataInputStream input = new DataInputStream(socket.getInputStream()); - timer.startTime(); - // Send data to output (length of data, the data) - output.writeInt(encryptedData.length); - output.write(encryptedData, 0, encryptedData.length); - output.flush(); + timer.startTime(); + // Send data to output (length of data, the data) + output.writeInt(encryptedData.length); + output.write(encryptedData, 0, encryptedData.length); + output.flush(); - int lengthOfReturnData = input.readInt(); - char[] returnData = new char[lengthOfReturnData]; - input.readFully(returnData); + int lengthOfReturnData = input.readInt(); + char[] returnData = new char[lengthOfReturnData]; + input.readFully(returnData); - timer.endTime(); + timer.endTime(); - // returnData = decryptCipher.doFinal(returnData); - returnData = stripIVAndDecryptSlot(returnData); - // returnData = decryptCipher.doFinal(returnData); + // returnData = decryptCipher.doFinal(returnData); + returnData = stripIVAndDecryptSlot(returnData); + // returnData = decryptCipher.doFinal(returnData); - // We are done with this socket - socket.close(); + // We are done with this socket + socket.close(); - mac.update(returnData, 0, returnData.length - HMAC_SIZE); - char[] realmac = mac.doFinal(); - char[] recmac = new char[HMAC_SIZE]; - System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length); + mac.update(returnData, 0, returnData.length - HMAC_SIZE); + char[] realmac = mac.doFinal(); + char[] recmac = new char[HMAC_SIZE]; + System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length); - if (!Arrays.equals(recmac, realmac)) - throw new Error("Local Error: Invalid HMAC! Potential Attack!"); + if (!Arrays.equals(recmac, realmac)) + throw new Error("Local Error: Invalid HMAC! Potential Attack!"); - char[] returnData2 = new char[lengthOfReturnData - recmac.length]; - System.arraycopy(returnData, 0, returnData2, 0, returnData2.length); + char[] returnData2 = new char[lengthOfReturnData - recmac.length]; + System.arraycopy(returnData, 0, returnData2, 0, returnData2.length); - return returnData2; - } catch (Exception e) { - e.printStackTrace(); - // throw new Error("Local comms failure..."); - - } + return returnData2; + } catch (Exception e) { + e.printStackTrace(); + // throw new Error("Local comms failure..."); - return NULL; } - void localServerWorkerFunction() { + return NULL; +} - ServerSocket inputSocket = NULL; +void CloudComm::localServerWorkerFunction() { - try { - // Local server socket - inputSocket = new ServerSocket(listeningPort); - inputSocket.setReuseAddress(true); - inputSocket.setSoTimeout(TIMEOUT_MILLIS); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local server setup failure..."); - } + ServerSocket inputSocket = NULL; - while (!doEnd) { + try { + // Local server socket + inputSocket = new ServerSocket(listeningPort); + inputSocket.setReuseAddress(true); + inputSocket.setSoTimeout(TIMEOUT_MILLIS); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local server setup failure..."); + } - try { - // Accept incoming socket - Socket socket = inputSocket.accept(); + while (!doEnd) { - DataInputStream input = new DataInputStream(socket.getInputStream()); - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + try { + // Accept incoming socket + Socket socket = inputSocket.accept(); - // Get the encrypted data from the server - int dataSize = input.readInt(); - char[] readData = new char[dataSize]; - input.readFully(readData); + DataInputStream input = new DataInputStream(socket.getInputStream()); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - timer.endTime(); + // Get the encrypted data from the server + int dataSize = input.readInt(); + char[] readData = new char[dataSize]; + input.readFully(readData); - // Decrypt the data - // readData = decryptCipher.doFinal(readData); - readData = stripIVAndDecryptSlot(readData); + timer.endTime(); - mac.update(readData, 0, readData.length - HMAC_SIZE); - char[] genmac = mac.doFinal(); - char[] recmac = new char[HMAC_SIZE]; - System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length); + // Decrypt the data + // readData = decryptCipher.doFinal(readData); + readData = stripIVAndDecryptSlot(readData); - if (!Arrays.equals(recmac, genmac)) - throw new Error("Local Error: Invalid HMAC! Potential Attack!"); + mac.update(readData, 0, readData.length - HMAC_SIZE); + char[] genmac = mac.doFinal(); + char[] recmac = new char[HMAC_SIZE]; + System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length); - char[] returnData = new char[readData.length - recmac.length]; - System.arraycopy(readData, 0, returnData, 0, returnData.length); + if (!Arrays.equals(recmac, genmac)) + throw new Error("Local Error: Invalid HMAC! Potential Attack!"); - // Process the data - // char[] sendData = table.acceptDataFromLocal(readData); - char[] sendData = table.acceptDataFromLocal(returnData); + char[] returnData = new char[readData.length - recmac.length]; + System.arraycopy(readData, 0, returnData, 0, returnData.length); + // Process the data + // char[] sendData = table.acceptDataFromLocal(readData); + char[] sendData = table.acceptDataFromLocal(returnData); - mac.update(sendData); - char[] realmac = mac.doFinal(); - char[] totalData = new char[sendData.length + realmac.length]; - System.arraycopy(sendData, 0, totalData, 0, sendData.length); - System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length); - // Encrypt the data for sending - // char[] encryptedData = encryptCipher.doFinal(totalData); - char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber()); - char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); + mac.update(sendData); + char[] realmac = mac.doFinal(); + char[] totalData = new char[sendData.length + realmac.length]; + System.arraycopy(sendData, 0, totalData, 0, sendData.length); + System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length); + // Encrypt the data for sending + // char[] encryptedData = encryptCipher.doFinal(totalData); + char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber()); + char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); - timer.startTime(); - // Send data to output (length of data, the data) - output.writeInt(encryptedData.length); - output.write(encryptedData, 0, encryptedData.length); - output.flush(); - // close the socket - socket.close(); - } catch (Exception e) { + timer.startTime(); + // Send data to output (length of data, the data) + output.writeInt(encryptedData.length); + output.write(encryptedData, 0, encryptedData.length); + output.flush(); - } - } + // close the socket + socket.close(); + } catch (Exception e) { - if (inputSocket != NULL) { - try { - inputSocket.close(); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local server close failure..."); - } } } - void close() { - doEnd = true; - - if (localServerThread != NULL) { - try { - localServerThread.join(); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local Server thread join issue..."); - } + if (inputSocket != NULL) { + try { + inputSocket.close(); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local server close failure..."); } - - // System.out.println("Done Closing Cloud Comm"); } +} + +void CloudComm::close() { + doEnd = true; - protected void finalize() throws Throwable { + if (localServerThread != NULL) { try { - close(); // close open files - } finally { - super.finalize(); + localServerThread.join(); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local Server thread join issue..."); } } + + // System.out.println("Done Closing Cloud Comm"); } + diff --git a/version2/src/C/CloudComm.h b/version2/src/C/CloudComm.h index a93d95f..4b958ae 100644 --- a/version2/src/C/CloudComm.h +++ b/version2/src/C/CloudComm.h @@ -1,6 +1,7 @@ +#ifndef CLOUDCOMM_H +#define CLOUDCOMM_H - - +#include "common.h" /** * This class provides a communication API to the webserver. It also @@ -9,647 +10,85 @@ * @version 1.0 */ +#define CloudComm_SALT_SIZE 8 +#define CloudComm_TIMEOUT_MILLIS 5000 +; // 100 +#define CloudComm_IV_SIZE 16 +/** Sets the size for the HMAC. */ +#define CloudComm_HMAC_SIZE 32 class CloudComm { - private static final int SALT_SIZE = 8; - private static final int TIMEOUT_MILLIS = 5000; // 100 - public static final int IV_SIZE = 16; - - /** Sets the size for the HMAC. */ - static final int HMAC_SIZE = 32; - - private String baseurl; - private SecretKeySpec key; - private Mac mac; - private String password; - private SecureRandom random; - private char salt[]; - private Table table; - private int listeningPort = -1; - private Thread localServerThread = NULL; - private bool doEnd = false; - - private TimingSingleton timer = NULL; +private: + IoTString *baseurl; + SecretKeySpec *key; + Mac *mac; + IoTString *password; + SecureRandom *random; + Array *salt; + Table *table; + int32_t listeningPort = -1; + Thread *localServerThread = NULL; + bool doEnd = false; + TimingSingleton *timer = NULL; /** - * Empty Constructor needed for child class. + * Generates Key from password. */ - CloudComm() { - timer = TimingSingleton.getInstance(); - } + SecretKeySpec *initKey(); /** - * Constructor for actual use. Takes in the url and password. + * Inits the HMAC generator. */ - CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) { - timer = TimingSingleton.getInstance(); - this.table = _table; - this.baseurl = _baseurl; - this.password = _password; - this.random = new SecureRandom(); - this.listeningPort = _listeningPort; - - if (this.listeningPort > 0) { - localServerThread = new Thread(new Runnable() { - public void run() { - localServerWorkerFunction(); - } - }); - localServerThread.start(); - } - } + void initCrypt(); - /** - * Generates Key from password. + /* + * Builds the URL for the given request. */ - private SecretKeySpec initKey() { - try { - PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), - salt, - 65536, - 128); - SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec); - return new SecretKeySpec(tmpkey.getEncoded(), "AES"); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed generating key."); - } - } - + URL buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries); + void setSalt(); + bool getSalt(); + Array *createIV(int64_t machineId, int64_t localSequenceNumber); + Array *encryptSlotAndPrependIV(Array *rawData, Array *ivBytes); + Array *stripIVAndDecryptSlot(Array *rawData); + Array *processSlots(DataInputStream dis); + void localServerWorkerFunction(); + +public: /** - * Inits all the security stuff + * Empty Constructor needed for child class. */ - public void initSecurity() throws ServerException { - // try to get the salt and if one does not exist set one - if (!getSalt()) { - //Set the salt - setSalt(); - } - - initCrypt(); - } + CloudComm(); /** - * Inits the HMAC generator. + * Constructor for actual use. Takes in the url and password. */ - private void initCrypt() { - - if (password == NULL) { - return; - } - - try { - key = initKey(); - password = NULL; // drop password - mac = Mac.getInstance("HmacSHA256"); - mac.init(key); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed To Initialize Ciphers"); - } - } + CloudComm(Table _table, String _baseurl, String _password, int _listeningPort); - /* - * Builds the URL for the given request. + /** + * Inits all the security stuff */ - private URL buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) throws IOException { - String reqstring = isput ? "req=putslot" : "req=getslot"; - String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber; - if (maxentries != 0) - urlstr += "&max=" + maxentries; - return new URL(urlstr); - } - - private void setSalt() throws ServerException { - - if (salt != NULL) { - // Salt already sent to server so dont set it again - return; - } - - try { - char[] saltTmp = new char[SALT_SIZE]; - random.nextBytes(saltTmp); - - for (int i = 0; i < SALT_SIZE; i++) { - System.out.println((int)saltTmp[i] & 255); - } - - - URL url = new URL(baseurl + "?req=setsalt"); - - timer.startTime(); - URLConnection con = url.openConnection(); - HttpURLConnection http = (HttpURLConnection) con; - - http.setRequestMethod("POST"); - http.setFixedLengthStreamingMode(saltTmp.length); - http.setDoOutput(true); - http.setConnectTimeout(TIMEOUT_MILLIS); - - - http.connect(); - - OutputStream os = http.getOutputStream(); - os.write(saltTmp); - os.flush(); - - int responsecode = http.getResponseCode(); - if (responsecode != HttpURLConnection.HTTP_OK) { - // TODO: Remove this print - System.out.println(responsecode); - throw new Error("Invalid response"); - } - - timer.endTime(); - - salt = saltTmp; - } catch (Exception e) { - // e.printStackTrace(); - timer.endTime(); - throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout); - } - } - - private bool getSalt() throws ServerException { - URL url = NULL; - URLConnection con = NULL; - HttpURLConnection http = NULL; - - try { - url = new URL(baseurl + "?req=getsalt"); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlot failed"); - } - try { - - timer.startTime(); - con = url.openConnection(); - http = (HttpURLConnection) con; - http.setRequestMethod("POST"); - http.setConnectTimeout(TIMEOUT_MILLIS); - http.setReadTimeout(TIMEOUT_MILLIS); - - - http.connect(); - timer.endTime(); - } catch (SocketTimeoutException e) { - timer.endTime(); - throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlot failed"); - } - - try { - - timer.startTime(); - - int responsecode = http.getResponseCode(); - if (responsecode != HttpURLConnection.HTTP_OK) { - // TODO: Remove this print - // System.out.println(responsecode); - throw new Error("Invalid response"); - } - - InputStream is = http.getInputStream(); - if (is.available() > 0) { - DataInputStream dis = new DataInputStream(is); - int salt_length = dis.readInt(); - char [] tmp = new char[salt_length]; - dis.readFully(tmp); - salt = tmp; - timer.endTime(); - - return true; - } else { - timer.endTime(); - - return false; - } - } catch (SocketTimeoutException e) { - timer.endTime(); - - throw new ServerException("getSalt failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlot failed"); - } - } - - private char[] createIV(int64_t machineId, int64_t localSequenceNumber) { - ByteBuffer buffer = ByteBuffer.allocate(IV_SIZE); - buffer.putLong(machineId); - int64_t localSequenceNumberShifted = localSequenceNumber << 16; - buffer.putLong(localSequenceNumberShifted); - return buffer.array(); - - } - - private char[] encryptSlotAndPrependIV(char[] rawData, char[] ivBytes) { - try { - IvParameterSpec ivSpec = new IvParameterSpec(ivBytes); - Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); - cipher.init(Cipher.ENCRYPT_MODE, key, ivSpec); - - char[] encryptedBytes = cipher.doFinal(rawData); - - char[] chars = new char[encryptedBytes.length + IV_SIZE]; - System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length); - System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length); - - return chars; - - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed To Encrypt"); - } - } - - - private char[] stripIVAndDecryptSlot(char[] rawData) { - try { - char[] ivBytes = new char[IV_SIZE]; - char[] encryptedBytes = new char[rawData.length - IV_SIZE]; - System.arraycopy(rawData, 0, ivBytes, 0, IV_SIZE); - System.arraycopy(rawData, IV_SIZE, encryptedBytes, 0 , encryptedBytes.length); - - IvParameterSpec ivSpec = new IvParameterSpec(ivBytes); - - Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); - cipher.init(Cipher.DECRYPT_MODE, key, ivSpec); - return cipher.doFinal(encryptedBytes); - - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Failed To Decrypt"); - } - } - + void initSecurity(); /* * API for putting a slot into the queue. Returns NULL on success. * On failure, the server will send slots with newer sequence * numbers. */ - public Slot[] putSlot(Slot slot, int max) throws ServerException { - URL url = NULL; - URLConnection con = NULL; - HttpURLConnection http = NULL; - - try { - if (salt == NULL) { - if (!getSalt()) { - throw new ServerException("putSlot failed", ServerException.TypeSalt); - } - initCrypt(); - } - - int64_t sequencenumber = slot.getSequenceNumber(); - char[] slotBytes = slot.encode(mac); - // slotBytes = encryptCipher.doFinal(slotBytes); - - // char[] iVBytes = slot.getSlotCryptIV(); - - // char[] chars = new char[slotBytes.length + IV_SIZE]; - // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length); - // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length); - - - char[] chars = encryptSlotAndPrependIV(slotBytes, slot.getSlotCryptIV()); - - url = buildRequest(true, sequencenumber, max); - - timer.startTime(); - con = url.openConnection(); - http = (HttpURLConnection) con; - - http.setRequestMethod("POST"); - http.setFixedLengthStreamingMode(chars.length); - http.setDoOutput(true); - http.setConnectTimeout(TIMEOUT_MILLIS); - http.setReadTimeout(TIMEOUT_MILLIS); - http.connect(); - - OutputStream os = http.getOutputStream(); - os.write(chars); - os.flush(); - - timer.endTime(); - - - // System.out.println("Bytes Sent: " + chars.length); - } catch (ServerException e) { - timer.endTime(); - - throw e; - } catch (SocketTimeoutException e) { - timer.endTime(); - - throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("putSlot failed"); - } - - - - try { - timer.startTime(); - InputStream is = http.getInputStream(); - DataInputStream dis = new DataInputStream(is); - char[] resptype = new char[7]; - dis.readFully(resptype); - timer.endTime(); - - if (Arrays.equals(resptype, "getslot".getBytes())) { - return processSlots(dis); - } else if (Arrays.equals(resptype, "putslot".getBytes())) { - return NULL; - } else - throw new Error("Bad response to putslot"); - - } catch (SocketTimeoutException e) { - timer.endTime(); - throw new ServerException("putSlot failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("putSlot failed"); - } - } + Array *putSlot(Slot slot, int max); /** * Request the server to send all slots with the given * sequencenumber or newer. */ - public Slot[] getSlots(int64_t sequencenumber) throws ServerException { - URL url = NULL; - URLConnection con = NULL; - HttpURLConnection http = NULL; - - try { - if (salt == NULL) { - if (!getSalt()) { - throw new ServerException("getSlots failed", ServerException.TypeSalt); - } - initCrypt(); - } - - url = buildRequest(false, sequencenumber, 0); - timer.startTime(); - con = url.openConnection(); - http = (HttpURLConnection) con; - http.setRequestMethod("POST"); - http.setConnectTimeout(TIMEOUT_MILLIS); - http.setReadTimeout(TIMEOUT_MILLIS); - - - - http.connect(); - timer.endTime(); - - } catch (SocketTimeoutException e) { - timer.endTime(); - - throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout); - } catch (ServerException e) { - timer.endTime(); - - throw e; - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlots failed"); - } + Array *getSlots(int64_t sequencenumber); - try { - - timer.startTime(); - InputStream is = http.getInputStream(); - DataInputStream dis = new DataInputStream(is); - char[] resptype = new char[7]; - - dis.readFully(resptype); - timer.endTime(); - - if (!Arrays.equals(resptype, "getslot".getBytes())) - throw new Error("Bad Response: " + new String(resptype)); - - return processSlots(dis); - } catch (SocketTimeoutException e) { - timer.endTime(); - - throw new ServerException("getSlots failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - // e.printStackTrace(); - throw new Error("getSlots failed"); - } - } /** * Method that actually handles building Slot objects from the * server response. Shared by both putSlot and getSlots. */ - private Slot[] processSlots(DataInputStream dis) throws Exception { - int numberofslots = dis.readInt(); - int[] sizesofslots = new int[numberofslots]; - - Slot[] slots = new Slot[numberofslots]; - for (int i = 0; i < numberofslots; i++) - sizesofslots[i] = dis.readInt(); - - for (int i = 0; i < numberofslots; i++) { - - char[] rawData = new char[sizesofslots[i]]; - dis.readFully(rawData); - - - // char[] data = new char[rawData.length - IV_SIZE]; - // System.arraycopy(rawData, IV_SIZE, data, 0, data.length); - - - char[] data = stripIVAndDecryptSlot(rawData); - - // data = decryptCipher.doFinal(data); - - slots[i] = Slot.decode(table, data, mac); - } - dis.close(); - return slots; - } - - public char[] sendLocalData(char[] sendData, int64_t localSequenceNumber, String host, int port) { - - if (salt == NULL) { - return NULL; - } - try { - System.out.println("Passing Locally"); - - mac.update(sendData); - char[] genmac = mac.doFinal(); - char[] totalData = new char[sendData.length + genmac.length]; - System.arraycopy(sendData, 0, totalData, 0, sendData.length); - System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length); - - // Encrypt the data for sending - // char[] encryptedData = encryptCipher.doFinal(totalData); - // char[] encryptedData = encryptCipher.doFinal(totalData); - char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber()); - char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); - - // Open a TCP socket connection to a local device - Socket socket = new Socket(host, port); - socket.setReuseAddress(true); - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - DataInputStream input = new DataInputStream(socket.getInputStream()); - - - timer.startTime(); - // Send data to output (length of data, the data) - output.writeInt(encryptedData.length); - output.write(encryptedData, 0, encryptedData.length); - output.flush(); - - int lengthOfReturnData = input.readInt(); - char[] returnData = new char[lengthOfReturnData]; - input.readFully(returnData); - - timer.endTime(); - - // returnData = decryptCipher.doFinal(returnData); - returnData = stripIVAndDecryptSlot(returnData); - // returnData = decryptCipher.doFinal(returnData); - - // We are done with this socket - socket.close(); - - mac.update(returnData, 0, returnData.length - HMAC_SIZE); - char[] realmac = mac.doFinal(); - char[] recmac = new char[HMAC_SIZE]; - System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length); - - if (!Arrays.equals(recmac, realmac)) - throw new Error("Local Error: Invalid HMAC! Potential Attack!"); - - char[] returnData2 = new char[lengthOfReturnData - recmac.length]; - System.arraycopy(returnData, 0, returnData2, 0, returnData2.length); - - return returnData2; - } catch (Exception e) { - e.printStackTrace(); - // throw new Error("Local comms failure..."); - - } - - return NULL; - } - - private void localServerWorkerFunction() { - - ServerSocket inputSocket = NULL; - - try { - // Local server socket - inputSocket = new ServerSocket(listeningPort); - inputSocket.setReuseAddress(true); - inputSocket.setSoTimeout(TIMEOUT_MILLIS); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local server setup failure..."); - } - - while (!doEnd) { - - try { - // Accept incoming socket - Socket socket = inputSocket.accept(); - - DataInputStream input = new DataInputStream(socket.getInputStream()); - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - - // Get the encrypted data from the server - int dataSize = input.readInt(); - char[] readData = new char[dataSize]; - input.readFully(readData); - - timer.endTime(); - - // Decrypt the data - // readData = decryptCipher.doFinal(readData); - readData = stripIVAndDecryptSlot(readData); - - mac.update(readData, 0, readData.length - HMAC_SIZE); - char[] genmac = mac.doFinal(); - char[] recmac = new char[HMAC_SIZE]; - System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length); - - if (!Arrays.equals(recmac, genmac)) - throw new Error("Local Error: Invalid HMAC! Potential Attack!"); - - char[] returnData = new char[readData.length - recmac.length]; - System.arraycopy(readData, 0, returnData, 0, returnData.length); - - // Process the data - // char[] sendData = table.acceptDataFromLocal(readData); - char[] sendData = table.acceptDataFromLocal(returnData); - - - mac.update(sendData); - char[] realmac = mac.doFinal(); - char[] totalData = new char[sendData.length + realmac.length]; - System.arraycopy(sendData, 0, totalData, 0, sendData.length); - System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length); - - // Encrypt the data for sending - // char[] encryptedData = encryptCipher.doFinal(totalData); - char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber()); - char[] encryptedData = encryptSlotAndPrependIV(totalData, iv); - - - timer.startTime(); - // Send data to output (length of data, the data) - output.writeInt(encryptedData.length); - output.write(encryptedData, 0, encryptedData.length); - output.flush(); - - // close the socket - socket.close(); - } catch (Exception e) { - - } - } - - if (inputSocket != NULL) { - try { - inputSocket.close(); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local server close failure..."); - } - } - } - - public void close() { - doEnd = true; - - if (localServerThread != NULL) { - try { - localServerThread.join(); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local Server thread join issue..."); - } - } - - // System.out.println("Done Closing Cloud Comm"); - } - protected void finalize() throws Throwable { - try { - close(); // close open files - } finally { - super.finalize(); - } - } -} + Array *sendLocalData(Array *sendData, int64_t localSequenceNumber, String host, int port); + public void close(); +}; +#endif diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index a5b952d..3672e08 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -31,230 +31,228 @@ void Commit::addPartDecode(CommitPart newPart) { if (isDead) { // If dead then just kill this part and move on - newPart.setDead(); + newPart->setDead(); return; } - CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - + CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); + if (previoslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { + previoslySeenPart->setDead(); + } else if (newPart->isLastPart()) { missingParts = new HashSet(); hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); + + for (int i = 0; i < newPart->getPartNumber(); i++) { + if (parts->get(i) == NULL) { + missingParts->add(i); } } } - + if (!fldisComplete && hasLastPart) { - + // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - + missingParts->remove(newPart->getPartNumber()); + // Check if all the parts have been seen - if (missingParts.size() == 0) { - + if (missingParts->size() == 0) { + // We have all the parts fldisComplete = true; - + // Decode all the parts and create the key value guard and update sets decodeCommitData(); - + // Get the sequence number and arbitrator of this transaction - sequenceNumber = parts.get(0).getSequenceNumber(); - machineId = parts.get(0).getMachineId(); - transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber(); + sequenceNumber = parts->get(0)->getSequenceNumber(); + machineId = parts->get(0)->getMachineId(); + transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber(); } } } - int64_t getSequenceNumber() { - return sequenceNumber; - } - - int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } +int64_t Commit::getSequenceNumber() { + return sequenceNumber; +} - Hashtable getParts() { - return parts; - } +int64_t Commit::getTransactionSequenceNumber() { + return transactionSequenceNumber; +} - void addKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - liveKeys.add(kv.getKey()); - } +Hashtable *Commit::getParts() { + return parts; +} - void invalidateKey(IoTString key) { - liveKeys.remove(key); +void Commit::addKV(KeyValue *kv) { + keyValueUpdateSet->add(kv); + liveKeys->add(kv->getKey()); +} - if (liveKeys.size() == 0) { - setDead(); - } - } +void Commit::invalidateKey(IoTString *key) { + liveKeys->remove(key); - Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } + if (liveKeys->size() == 0) { + setDead(); + } +} -int32_t getNumberOfParts() { - return parts.size(); +Hashset *Commit::getKeyValueUpdateSet() { + return keyValueUpdateSet; } - void setDead() { - if (isDead) { - // Already dead - return; - } +int32_t Commit::getNumberOfParts() { + return parts->size(); +} - // Set dead - isDead = true; +void Commit::setDead() { + if (isDead) { + // Already dead + return; + } - // Make all the parts of this transaction dead - for (int32_t partNumber : parts.keySet()) { - CommitPart part = parts.get(partNumber); - part.setDead(); - } - } + // Set dead + isDead = true; - CommitPart getPart(int index) { - return parts.get(index); - } + // Make all the parts of this transaction dead + for (int32_t partNumber : parts->keySet()) { + CommitPart part = parts->get(partNumber); + part->setDead(); + } +} - void createCommitParts() { +CommitPart *Commit::getPart(int index) { + return parts->get(index); +} - parts.clear(); +void Commit::createCommitParts() { + parts->clear(); - // Convert to chars - char[] charData = convertDataToBytes(); + // Convert to chars + Array *charData = convertDataToBytes(); - int commitPartCount = 0; - int currentPosition = 0; - int remaining = charData.length; + int commitPartCount = 0; + int currentPosition = 0; + int remaining = charData->length(); - while (remaining > 0) { + while (remaining > 0) { - Boolean isLastPart = false; - // determine how much to copy - int copySize = CommitPart.MAX_NON_HEADER_SIZE; - if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) { - copySize = remaining; - isLastPart = true; // last bit of data so last part - } + bool isLastPart = false; + // determine how much to copy + int copySize = CommitPart_MAX_NON_HEADER_SIZE; + if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) { + copySize = remaining; + isLastPart = true;// last bit of data so last part + } - // Copy to a smaller version - char[] partData = new char[copySize]; - System.arraycopy(charData, currentPosition, partData, 0, copySize); + // Copy to a smaller version + Array *partData = new Array(copySize); + System->arraycopy(charData, currentPosition, partData, 0, copySize); - CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); - parts.put(part.getPartNumber(), part); + CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); + parts->put(part->getPartNumber(), part); - // Update position, count and remaining - currentPosition += copySize; - commitPartCount++; - remaining -= copySize; - } - } + // Update position, count and remaining + currentPosition += copySize; + commitPartCount++; + remaining -= copySize; + } +} - void decodeCommitData() { +void Commit::decodeCommitData() { - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - CommitPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } + // Calculate the size of the data section + int dataSize = 0; + for (int i = 0; i < parts->keySet()->size(); i++) { + CommitPart *tp = parts->get(i); + dataSize += tp->getDataSize(); + } - char[] combinedData = new char[dataSize]; - int currentPosition = 0; + Array *combinedData = new Array(dataSize); + int currentPosition = 0; - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - CommitPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } + // Stitch all the data sections together + for (int i = 0; i < parts->keySet()->size(); i++) { + CommitPart *tp = parts->get(i); + System->arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); + } - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); + // Decoder Object + ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); - // Decode how many key value pairs need to be decoded - int numberOfKVUpdates = bbDecode.getInt(); + // Decode how many key value pairs need to be decoded + int numberOfKVUpdates = bbDecode->getInt(); - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - liveKeys.add(kv.getKey()); - } - } + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue *kv = (KeyValue *)KeyValue->decode(bbDecode); + keyValueUpdateSet->add(kv); + liveKeys->add(kv->getKey()); + } +} - char[] convertDataToBytes() { +Array *convertDataToBytes() { - // Calculate the size of the data - int sizeOfData = sizeof(int32_t); // Number of Update KV's - for (KeyValue kv : keyValueUpdateSet) { - sizeOfData += kv.getSize(); - } + // Calculate the size of the data + int sizeOfData = sizeof(int32_t); // Number of Update KV's + for (KeyValue *kv : keyValueUpdateSet) { + sizeOfData += kv->getSize(); + } - // Data handlers and storage - char[] dataArray = new char[sizeOfData]; - ByteBuffer bbEncode = ByteBuffer.wrap(dataArray); + // Data handlers and storage + Array *dataArray = new Array(sizeOfData); + ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); - // Encode the size of the updates and guard sets - bbEncode.putInt(keyValueUpdateSet.size()); + // Encode the size of the updates and guard sets + bbEncode->putInt(keyValueUpdateSet->size()); - // Encode all the updates - for (KeyValue kv : keyValueUpdateSet) { - kv.encode(bbEncode); - } + // Encode all the updates + for (KeyValue *kv : keyValueUpdateSet) { + kv->encode(bbEncode); + } - return bbEncode.array(); - } + return bbEncode->array(); +} - void setKVsMap(Hashtable newKVs) { - keyValueUpdateSet.clear(); - liveKeys.clear(); +void Commit::setKVsMap(Hashtable *newKVs) { + keyValueUpdateSet->clear(); + liveKeys->clear(); - keyValueUpdateSet.addAll(newKVs.values()); - liveKeys.addAll(newKVs.keySet()); + keyValueUpdateSet->addAll(newKVs->values()); + liveKeys->addAll(newKVs->keySet()); - } +} - static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) { +Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { - if (older == NULL) { - return newer; - } else if (newer == NULL) { - return older; - } + if (older == NULL) { + return newer; + } else if (newer == NULL) { + return older; + } - Hashtable kvSet = new Hashtable(); - for (KeyValue kv : older.getKeyValueUpdateSet()) { - kvSet.put(kv.getKey(), kv); - } + Hashtable *kvSet = new Hashtable(); + for (KeyValue *kv : older->getKeyValueUpdateSet()) { + kvSet->put(kv->getKey(), kv); + } - for (KeyValue kv : newer.getKeyValueUpdateSet()) { - kvSet.put(kv.getKey(), kv); - } + for (KeyValue *kv : newer->getKeyValueUpdateSet()) { + kvSet->put(kv->getKey(), kv); + } - int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber(); + int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber(); - if (transactionSequenceNumber == -1) { - transactionSequenceNumber = older.getTransactionSequenceNumber(); - } + if (transactionSequenceNumber == -1) { + transactionSequenceNumber = older->getTransactionSequenceNumber(); + } - Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber); + Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber); - newCommit.setKVsMap(kvSet); + newCommit->setKVsMap(kvSet); - return newCommit; - } + return newCommit; } diff --git a/version2/src/C/Commit.h b/version2/src/C/Commit.h index 4dd5a9d..752293c 100644 --- a/version2/src/C/Commit.h +++ b/version2/src/C/Commit.h @@ -3,40 +3,40 @@ #include "common.h" class Commit { - private: - Hashtable * parts; - Hashset *missingParts; - bool fldisComplete; +private: + Hashtable *parts; + Hashset *missingParts; + bool fldisComplete; bool hasLastPart; - Hashset *keyValueUpdateSet; - bool isDead; - int64_t sequenceNumber; + Hashset *keyValueUpdateSet; + bool isDead; + int64_t sequenceNumber; int64_t machineId; - int64_t transactionSequenceNumber; - Hashset * liveKeys; - Array * convertDataToBytes(); - void setKVsMap(Hashtable * newKVs); - - public: + int64_t transactionSequenceNumber; + Hashset *liveKeys; + Array *convertDataToBytes(); + void setKVsMap(Hashtable *newKVs); + +public: Commit(); Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber); - - void addPartDecode(CommitPart * newPart); + + void addPartDecode(CommitPart *newPart); int64_t getSequenceNumber(); int64_t getTransactionSequenceNumber(); - Hashtable *getParts(); - void addKV(KeyValue * kv); - void invalidateKey(IoTString * key); - Hashset * getKeyValueUpdateSet(); + Hashtable *getParts(); + void addKV(KeyValue *kv); + void invalidateKey(IoTString *key); + Hashset *getKeyValueUpdateSet(); int32_t getNumberOfParts(); int64_t getMachineId() { return machineId; } bool isComplete() { return fldisComplete; } bool isLive() { return !isDead; } void setDead(); - CommitPart * getPart(int32_t index); + CommitPart *getPart(int32_t index); void createCommitParts(); void decodeCommitData(); }; -Commit * Commit_merge(Commit * newer, Commit * older, int64_t newSequenceNumber); +Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber); #endif diff --git a/version2/src/C/CommitPart.cc b/version2/src/C/CommitPart.cc index 4de55c9..1394f32 100644 --- a/version2/src/C/CommitPart.cc +++ b/version2/src/C/CommitPart.cc @@ -2,121 +2,121 @@ -class CommitPart extends Entry{ - - // Max size of the part excluding the fixed size header - static final int MAX_NON_HEADER_SIZE = 512; - - - // Sequence number of the transaction this commit is for, -1 if not a cloud transaction - int64_t machineId = -1; // Machine Id of the device that made the commit - int64_t sequenceNumber = -1; // commit sequence number for this arbitrator - int64_t transactionSequenceNumber = -1; - int partNumber = -1; // Parts position in the - Boolean isLastPart = false; - char[] data = NULL; - - Pair partId = NULL; - Pair commitId = NULL; - - - CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) { - super(s); - machineId = _machineId; - sequenceNumber = _sequenceNumber; - transactionSequenceNumber = _transactionSequenceNumber; - partNumber = _partNumber; - isLastPart = _isLastPart; - data = _data; - - partId = new Pair(sequenceNumber, partNumber); - commitId = new Pair(machineId, sequenceNumber); - } - - int getSize() { - if (data == NULL) { - return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); - } - return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; - } - - void setSlot(Slot s) { - parentslot = s; - } - - int getPartNumber() { - return partNumber; - } - - int getDataSize() { - return data.length; - } - - char[] getData() { - return data; - } - - Pair getPartId() { - return partId; - } - - Pair getCommitId() { - return commitId; - } - - Boolean isLastPart() { - return isLastPart; - } - - int64_t getMachineId() { - return machineId; - } - - int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } - - int64_t getSequenceNumber() { - return sequenceNumber; - } - - static Entry decode(Slot s, ByteBuffer bb) { - int64_t machineId = bb->getLong(); - int64_t sequenceNumber = bb->getLong(); - int64_t transactionSequenceNumber = bb->getLong(); - int partNumber = bb->getInt(); - int dataSize = bb->getInt(); - Boolean isLastPart = bb->get() == 1; - - // Get the data - char[] data = new char[dataSize]; - bb->get(data); - - return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); - } - - void encode(ByteBuffer bb) { - bb->put(Entry.TypeCommitPart); - bb->putLong(machineId); - bb->putLong(sequenceNumber); - bb->putLong(transactionSequenceNumber); - bb->putInt(partNumber); - bb->putInt(data.length); - - if (isLastPart) { - bb->put((char)1); - } else { - bb->put((char)0); - } - - bb->put(data); - } - - char getType() { - return Entry.TypeCommitPart; - } - - Entry getCopy(Slot s) { - return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); - } +class CommitPart extends Entry { + + // Max size of the part excluding the fixed size header + static final int MAX_NON_HEADER_SIZE = 512; + + + // Sequence number of the transaction this commit is for, -1 if not a cloud transaction + int64_t machineId = -1; // Machine Id of the device that made the commit + int64_t sequenceNumber = -1; // commit sequence number for this arbitrator + int64_t transactionSequenceNumber = -1; + int partNumber = -1; // Parts position in the + bool isLastPart = false; + char[] data = NULL; + + Pair partId = NULL; + Pair commitId = NULL; + + + CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) { + super(s); + machineId = _machineId; + sequenceNumber = _sequenceNumber; + transactionSequenceNumber = _transactionSequenceNumber; + partNumber = _partNumber; + isLastPart = _isLastPart; + data = _data; + + partId = new Pair(sequenceNumber, partNumber); + commitId = new Pair(machineId, sequenceNumber); + } + + int getSize() { + if (data == NULL) { + return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); + } + return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; + } + + void setSlot(Slot s) { + parentslot = s; + } + + int getPartNumber() { + return partNumber; + } + + int getDataSize() { + return data.length; + } + + char[] getData() { + return data; + } + + Pair getPartId() { + return partId; + } + + Pair getCommitId() { + return commitId; + } + + bool isLastPart() { + return isLastPart; + } + + int64_t getMachineId() { + return machineId; + } + + int64_t getTransactionSequenceNumber() { + return transactionSequenceNumber; + } + + int64_t getSequenceNumber() { + return sequenceNumber; + } + + static Entry decode(Slot s, ByteBuffer bb) { + int64_t machineId = bb->getLong(); + int64_t sequenceNumber = bb->getLong(); + int64_t transactionSequenceNumber = bb->getLong(); + int partNumber = bb->getInt(); + int dataSize = bb->getInt(); + bool isLastPart = bb->get() == 1; + + // Get the data + char[] data = new char[dataSize]; + bb->get(data); + + return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); + } + + void encode(ByteBuffer bb) { + bb->put(Entry.TypeCommitPart); + bb->putLong(machineId); + bb->putLong(sequenceNumber); + bb->putLong(transactionSequenceNumber); + bb->putInt(partNumber); + bb->putInt(data.length); + + if (isLastPart) { + bb->put((char)1); + } else { + bb->put((char)0); + } + + bb->put(data); + } + + char getType() { + return Entry.TypeCommitPart; + } + + Entry getCopy(Slot s) { + return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); + } } diff --git a/version2/src/C/CommitPart.h b/version2/src/C/CommitPart.h index e121aad..32a418b 100644 --- a/version2/src/C/CommitPart.h +++ b/version2/src/C/CommitPart.h @@ -2,121 +2,121 @@ -class CommitPart extends Entry{ - - // Max size of the part excluding the fixed size header - public static final int MAX_NON_HEADER_SIZE = 512; - - - // Sequence number of the transaction this commit is for, -1 if not a cloud transaction - private int64_t machineId = -1; // Machine Id of the device that made the commit - private int64_t sequenceNumber = -1; // commit sequence number for this arbitrator - private int64_t transactionSequenceNumber = -1; - private int partNumber = -1; // Parts position in the - private Boolean isLastPart = false; - private char[] data = NULL; - - private Pair partId = NULL; - private Pair commitId = NULL; - - - public CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) { - super(s); - machineId = _machineId; - sequenceNumber = _sequenceNumber; - transactionSequenceNumber = _transactionSequenceNumber; - partNumber = _partNumber; - isLastPart = _isLastPart; - data = _data; - - partId = new Pair(sequenceNumber, partNumber); - commitId = new Pair(machineId, sequenceNumber); - } - - public int getSize() { - if (data == NULL) { - return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); - } - return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; - } - - public void setSlot(Slot s) { - parentslot = s; - } - - public int getPartNumber() { - return partNumber; - } - - public int getDataSize() { - return data.length; - } - - public char[] getData() { - return data; - } - - public Pair getPartId() { - return partId; - } - - public Pair getCommitId() { - return commitId; - } - - public Boolean isLastPart() { - return isLastPart; - } - - public int64_t getMachineId() { - return machineId; - } - - public int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } - - public int64_t getSequenceNumber() { - return sequenceNumber; - } - - static Entry decode(Slot s, ByteBuffer bb) { - int64_t machineId = bb->getLong(); - int64_t sequenceNumber = bb->getLong(); - int64_t transactionSequenceNumber = bb->getLong(); - int partNumber = bb->getInt(); - int dataSize = bb->getInt(); - Boolean isLastPart = bb->get() == 1; - - // Get the data - char[] data = new char[dataSize]; - bb->get(data); - - return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); - } - - public void encode(ByteBuffer bb) { - bb->put(Entry.TypeCommitPart); - bb->putLong(machineId); - bb->putLong(sequenceNumber); - bb->putLong(transactionSequenceNumber); - bb->putInt(partNumber); - bb->putInt(data.length); - - if (isLastPart) { - bb->put((char)1); - } else { - bb->put((char)0); - } - - bb->put(data); - } - - public char getType() { - return Entry.TypeCommitPart; - } - - public Entry getCopy(Slot s) { - return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); - } +class CommitPart extends Entry { + + // Max size of the part excluding the fixed size header + public static final int MAX_NON_HEADER_SIZE = 512; + + + // Sequence number of the transaction this commit is for, -1 if not a cloud transaction + private int64_t machineId = -1; // Machine Id of the device that made the commit + private int64_t sequenceNumber = -1; // commit sequence number for this arbitrator + private int64_t transactionSequenceNumber = -1; + private int partNumber = -1; // Parts position in the + private bool isLastPart = false; + private char[] data = NULL; + + private Pair partId = NULL; + private Pair commitId = NULL; + + + public CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) { + super(s); + machineId = _machineId; + sequenceNumber = _sequenceNumber; + transactionSequenceNumber = _transactionSequenceNumber; + partNumber = _partNumber; + isLastPart = _isLastPart; + data = _data; + + partId = new Pair(sequenceNumber, partNumber); + commitId = new Pair(machineId, sequenceNumber); + } + + public int getSize() { + if (data == NULL) { + return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); + } + return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; + } + + public void setSlot(Slot s) { + parentslot = s; + } + + public int getPartNumber() { + return partNumber; + } + + public int getDataSize() { + return data.length; + } + + public char[] getData() { + return data; + } + + public Pair getPartId() { + return partId; + } + + public Pair getCommitId() { + return commitId; + } + + public bool isLastPart() { + return isLastPart; + } + + public int64_t getMachineId() { + return machineId; + } + + public int64_t getTransactionSequenceNumber() { + return transactionSequenceNumber; + } + + public int64_t getSequenceNumber() { + return sequenceNumber; + } + + static Entry decode(Slot s, ByteBuffer bb) { + int64_t machineId = bb->getLong(); + int64_t sequenceNumber = bb->getLong(); + int64_t transactionSequenceNumber = bb->getLong(); + int partNumber = bb->getInt(); + int dataSize = bb->getInt(); + bool isLastPart = bb->get() == 1; + + // Get the data + char[] data = new char[dataSize]; + bb->get(data); + + return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); + } + + public void encode(ByteBuffer bb) { + bb->put(Entry.TypeCommitPart); + bb->putLong(machineId); + bb->putLong(sequenceNumber); + bb->putLong(transactionSequenceNumber); + bb->putInt(partNumber); + bb->putInt(data.length); + + if (isLastPart) { + bb->put((char)1); + } else { + bb->put((char)0); + } + + bb->put(data); + } + + public char getType() { + return Entry.TypeCommitPart; + } + + public Entry getCopy(Slot s) { + return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart); + } } diff --git a/version2/src/C/Entry.cc b/version2/src/C/Entry.cc index 2bde6eb..7d0a039 100644 --- a/version2/src/C/Entry.cc +++ b/version2/src/C/Entry.cc @@ -7,45 +7,45 @@ * @version 1.0 */ -Entry * Entry_decode(Slot * slot, ByteBuffer * bb) { - char type = bb->get(); - switch (type) { - - case TypeCommitPart: - return CommitPart_decode(slot, bb); - - case TypeAbort: - return Abort_decode(slot, bb); - - case TypeTransactionPart: - return TransactionPart_decode(slot, bb); - - case TypeNewKey: - return NewKey_decode(slot, bb); - - case TypeLastMessage: - return LastMessage_decode(slot, bb); - - case TypeRejectedMessage: - return RejectedMessage_decode(slot, bb); - - case TypeTableStatus: - return TableStatus_decode(slot, bb); - - default: - ASSERT(0); - } +Entry *Entry_decode(Slot *slot, ByteBuffer *bb) { + char type = bb->get(); + switch (type) { + + case TypeCommitPart: + return CommitPart_decode(slot, bb); + + case TypeAbort: + return Abort_decode(slot, bb); + + case TypeTransactionPart: + return TransactionPart_decode(slot, bb); + + case TypeNewKey: + return NewKey_decode(slot, bb); + + case TypeLastMessage: + return LastMessage_decode(slot, bb); + + case TypeRejectedMessage: + return RejectedMessage_decode(slot, bb); + + case TypeTableStatus: + return TableStatus_decode(slot, bb); + + default: + ASSERT(0); + } } void Entry::setDead() { - if (!islive ) { - return; // already dead - } - - islive = false; - - if (parentslot != NULL) { - parentslot->decrementLiveCount(); - } + if (!islive ) { + return; // already dead + } + + islive = false; + + if (parentslot != NULL) { + parentslot->decrementLiveCount(); + } } diff --git a/version2/src/C/Entry.h b/version2/src/C/Entry.h index c3cc8a2..1ec48b1 100644 --- a/version2/src/C/Entry.h +++ b/version2/src/C/Entry.h @@ -19,15 +19,15 @@ class Entry : public Liveness { /* Records whether the information is still live or has been - superceded by a newer update. */ - private: + superceded by a newer update. */ +private: bool islive; - protected: - Slot * parentslot; +protected: + Slot *parentslot; - public: - Entry(Slot * _parentslot) : islive(true), parentslot(_parentslot) {} +public: + Entry(Slot *_parentslot) : islive(true), parentslot(_parentslot) {} /** * Returns true if the Entry object is still live. @@ -43,32 +43,32 @@ class Entry : public Liveness { /** * Serializes the Entry object into the char buffer. */ - virtual void encode(ByteBuffer * bb) = 0; + virtual void encode(ByteBuffer *bb) = 0; /** * Returns the size in chars the entry object will take in the char * array. */ - virtual int getSize() = 0; + virtual int getSize() = 0; /** * Returns a char encoding the type of the entry object. */ - virtual char getType() = 0; + virtual char getType() = 0; /** * Returns a copy of the Entry that can be added to a different slot. */ - virtual Entry * getCopy(Slot * s) = 0; + virtual Entry *getCopy(Slot *s) = 0; }; /** * Static method for decoding char array into Entry objects. First * char tells the type of entry. */ -Entry * Entry_decode(Slot * slot, ByteBuffer * bb); +Entry *Entry_decode(Slot *slot, ByteBuffer *bb); #endif diff --git a/version2/src/C/IoTString.h b/version2/src/C/IoTString.h index 00dd8e6..bbd0c77 100644 --- a/version2/src/C/IoTString.h +++ b/version2/src/C/IoTString.h @@ -10,37 +10,43 @@ */ public class IoTString { - private: - Array array; - - IoTString() {} +private: + Array *array; + IoTString() {} /** * Builds an IoTString object around the char array. This * constructor makes a copy, so the caller is free to modify the char array. */ - - public: - IoTString(Array * _array) { array.init(_array); } - ~IoTString() {} - + +public: + IoTString(Array *_array) : array(new Array(_array)) {} + ~IoTString() {} + /** * Internal method to grab a reference to our char array. Caller * must not modify it. */ - - Array * internalBytes() { return &array; } - + + Array *internalBytes() { return array; } + /** * Returns a copy of the underlying char string. */ - - Array * getBytes() { return new Array(&array); } + + Array *getBytes() { return new Array(&array); } /** * Returns the length in chars of the IoTString. */ - + int length() { return array->length(); } + friend IoTString *IoTString_shallow(Array *_array); +}; + +IoTString *IoTString_shallow(Array *_array) { + IoTString *str = new IoTString(); + str->array = _array; + return str; } #endif diff --git a/version2/src/C/KeyValue.cc b/version2/src/C/KeyValue.cc index 0be46c5..561ae80 100644 --- a/version2/src/C/KeyValue.cc +++ b/version2/src/C/KeyValue.cc @@ -1,74 +1,49 @@ - +#include "KeyValue.h" /** * KeyValue entry for Slot. * @author Brian Demsky * @version 1.0 */ -class KeyValue { /*extends Entry */ - IoTString key; - IoTString value; - - KeyValue(IoTString _key, IoTString _value) { - key = _key; - value = _value; - } +KeyValue *KeyValue_decode(ByteBuffer *bb) { + int keylength = bb->getInt(); + int valuelength = bb->getInt(); + Array *key = new Array *(keylength); + bb->get(key); - IoTString getKey() { - return key; + if (valuelength != 0) { + Array *value = new Array(valuelength); + bb->get(value); + return new KeyValue(IoTString_shallow(key), IoTString_shallow(value)); } - IoTString getValue() { - return value; - } - - static KeyValue decode(ByteBuffer bb) { - int keylength = bb->getInt(); - int valuelength = bb->getInt(); - char[] key = new char[keylength]; - bb->get(key); + return new KeyValue(IoTString_shallow(key), NULL); +} - if (valuelength != 0) { - char[] value = new char[valuelength]; - bb->get(value); - return new KeyValue(IoTString.shallow(key), IoTString.shallow(value)); - } +void KeyValue::encode(ByteBuffer *bb) { + bb->putInt(key->length()); - return new KeyValue(IoTString.shallow(key), NULL); + if (value != NULL) { + bb->putInt(value->length()); + } else { + bb->putInt(0); } - void encode(ByteBuffer bb) { - bb->putInt(key.length()); - - if (value != NULL) { - bb->putInt(value.length()); - } else { - bb->putInt(0); - } + bb->put(key->internalBytes()); - bb->put(key.internalBytes()); - - if (value != NULL) { - bb->put(value.internalBytes()); - } + if (value != NULL) { + bb->put(value->internalBytes()); } +} - int getSize() { - if (value != NULL) { - return 2 * sizeof(int32_t) + key.length() + value.length(); - } - - return 2 * sizeof(int32_t) + key.length(); +int KeyValue::getSize() { + if (value != NULL) { + return 2 * sizeof(int32_t) + key->length() + value->length(); } - String toString() { - if (value == NULL) { - return "NULL"; - } - return value.toString(); - } + return 2 * sizeof(int32_t) + key.length(); +} - KeyValue getCopy() { - return new KeyValue(key, value); - } +KeyValue *KeyValue::getCopy() { + return new KeyValue(key, value); } diff --git a/version2/src/C/KeyValue.h b/version2/src/C/KeyValue.h index dadda7b..bd384be 100644 --- a/version2/src/C/KeyValue.h +++ b/version2/src/C/KeyValue.h @@ -1,3 +1,6 @@ +#ifndef KEYVALUE_H +#define KEYVALUE_H +#include "common.h" /** * KeyValue entry for Slot. @@ -5,70 +8,23 @@ * @version 1.0 */ -class KeyValue { /*extends Entry */ - private IoTString key; - private IoTString value; +class KeyValue {/*extends Entry */ +private: + IoTString *key; + IoTString *value; - public KeyValue(IoTString _key, IoTString _value) { - key = _key; - value = _value; +public: + KeyValue(IoTString *_key, IoTString *_value) : + key(_key), + value(_value) { } - public IoTString getKey() { - return key; - } - - public IoTString getValue() { - return value; - } - - static KeyValue decode(ByteBuffer bb) { - int keylength = bb->getInt(); - int valuelength = bb->getInt(); - char[] key = new char[keylength]; - bb->get(key); - - if (valuelength != 0) { - char[] value = new char[valuelength]; - bb->get(value); - return new KeyValue(IoTString.shallow(key), IoTString.shallow(value)); - } - - return new KeyValue(IoTString.shallow(key), NULL); - } - - public void encode(ByteBuffer bb) { - bb->putInt(key.length()); - - if (value != NULL) { - bb->putInt(value.length()); - } else { - bb->putInt(0); - } + IoTString *getKey() { return key; } + IoTString *getValue() { return value; } + void encode(ByteBuffer *bb); + int32_t getSize(); + KeyValue *getCopy(); +}; - bb->put(key.internalBytes()); - - if (value != NULL) { - bb->put(value.internalBytes()); - } - } - - public int getSize() { - if (value != NULL) { - return 2 * sizeof(int32_t) + key.length() + value.length(); - } - - return 2 * sizeof(int32_t) + key.length(); - } - - public String toString() { - if (value == NULL) { - return "NULL"; - } - return value.toString(); - } - - public KeyValue getCopy() { - return new KeyValue(key, value); - } -} +KeyValue *KeyValue_decode(ByteBuffer *bb); +#endif diff --git a/version2/src/C/LastMessage.cc b/version2/src/C/LastMessage.cc index 55eec50..92cf608 100644 --- a/version2/src/C/LastMessage.cc +++ b/version2/src/C/LastMessage.cc @@ -8,13 +8,13 @@ * @version 1.0 */ -Entry * LastMessage_decode(Slot * slot, ByteBuffer * bb) { - int64_t machineid=bb->getLong(); - int64_t seqnum=bb->getLong(); +Entry *LastMessage_decode(Slot *slot, ByteBuffer *bb) { + int64_t machineid = bb->getLong(); + int64_t seqnum = bb->getLong(); return new LastMessage(slot, machineid, seqnum); } -void LastMessage::encode(ByteBuffer * bb) { +void LastMessage::encode(ByteBuffer *bb) { bb->put(TypeLastMessage); bb->putLong(machineid); bb->putLong(seqnum); diff --git a/version2/src/C/LastMessage.h b/version2/src/C/LastMessage.h index d4cddd0..22d7d99 100644 --- a/version2/src/C/LastMessage.h +++ b/version2/src/C/LastMessage.h @@ -1,8 +1,8 @@ #ifndef LASTMESSAGE_H #define LASTMESSAGE_H -#include"common.h" -#include"Entry.h" +#include "common.h" +#include "Entry.h" /** * This Entry records the last message sent by a given machine. @@ -12,24 +12,24 @@ class LastMessage : public Entry { - private: +private: int64_t machineid; int64_t seqnum; - public: - LastMessage(Slot * slot, int64_t _machineid, int64_t _seqnum) : - Entry(slot), +public: + LastMessage(Slot *slot, int64_t _machineid, int64_t _seqnum) : + Entry(slot), machineid(_machineid), seqnum(_seqnum) { - } + } int64_t getMachineID() { return machineid; } int64_t getSequenceNumber() { return seqnum; } - void encode(ByteBuffer * bb); - int getSize() { return 2*sizeof(int64_t)+sizeof(char); } + void encode(ByteBuffer *bb); + int getSize() { return 2 * sizeof(int64_t) + sizeof(char); } char getType() { return TypeLastMessage; } - Entry * getCopy(Slot * s) { return new LastMessage(s, machineid, seqnum); } + Entry *getCopy(Slot *s) { return new LastMessage(s, machineid, seqnum); } }; -Entry * LastMessage_decode(Slot * slot, ByteBuffer * bb); +Entry *LastMessage_decode(Slot *slot, ByteBuffer *bb); #endif diff --git a/version2/src/C/LocalComm.cc b/version2/src/C/LocalComm.cc index 71ea32a..213fb3e 100644 --- a/version2/src/C/LocalComm.cc +++ b/version2/src/C/LocalComm.cc @@ -1,24 +1,24 @@ class LocalComm { - Table t1; - Table t2; + Table t1; + Table t2; - LocalComm(Table _t1, Table _t2) { - t1 = _t1; - t2 = _t2; - } + LocalComm(Table _t1, Table _t2) { + t1 = _t1; + t2 = _t2; + } - char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException{ - System.out.println("Passing Locally"); + char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException { + System.out.println("Passing Locally"); - if (deviceId == t1.getMachineId()) { - // return t1.localCommInput(data); - } else if (deviceId == t2.getMachineId()) { - // return t2.localCommInput(data); - } else { - throw new Error("Cannot send to " + deviceId + " using this local comm"); - } + if (deviceId == t1.getMachineId()) { + // return t1.localCommInput(data); + } else if (deviceId == t2.getMachineId()) { + // return t2.localCommInput(data); + } else { + throw new Error("Cannot send to " + deviceId + " using this local comm"); + } - return new char[0]; - } + return new char[0]; + } } diff --git a/version2/src/C/LocalComm.h b/version2/src/C/LocalComm.h index ceb8689..10c90b0 100644 --- a/version2/src/C/LocalComm.h +++ b/version2/src/C/LocalComm.h @@ -1,24 +1,24 @@ class LocalComm { - private Table t1; - private Table t2; + private Table t1; + private Table t2; - public LocalComm(Table _t1, Table _t2) { - t1 = _t1; - t2 = _t2; - } + public LocalComm(Table _t1, Table _t2) { + t1 = _t1; + t2 = _t2; + } - public char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException{ - System.out.println("Passing Locally"); + public char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException { + System.out.println("Passing Locally"); - if (deviceId == t1.getMachineId()) { - // return t1.localCommInput(data); - } else if (deviceId == t2.getMachineId()) { - // return t2.localCommInput(data); - } else { - throw new Error("Cannot send to " + deviceId + " using this local comm"); - } + if (deviceId == t1.getMachineId()) { + // return t1.localCommInput(data); + } else if (deviceId == t2.getMachineId()) { + // return t2.localCommInput(data); + } else { + throw new Error("Cannot send to " + deviceId + " using this local comm"); + } - return new char[0]; - } + return new char[0]; + } } diff --git a/version2/src/C/Makefile b/version2/src/C/Makefile index caccbdb..2f8a91c 100644 --- a/version2/src/C/Makefile +++ b/version2/src/C/Makefile @@ -79,10 +79,10 @@ tags: ctags -R tabbing: - uncrustify -c C.cfg --no-backup *.cc */*.cc */*/*.cc - uncrustify -c C.cfg --no-backup *.h */*.h */*/*.h + uncrustify -c C.cfg --no-backup *.cc + uncrustify -c C.cfg --no-backup *.h wc: - wc */*.cc */*.h *.cc *.h */*/*.cc */*/*.h + wc *.cc *.h .PHONY: $(PHONY) diff --git a/version2/src/C/NewKey.cc b/version2/src/C/NewKey.cc index 85a5234..c6afe55 100644 --- a/version2/src/C/NewKey.cc +++ b/version2/src/C/NewKey.cc @@ -1,16 +1,16 @@ #include "NewKey.h" #include "ByteBuffer.h" -Entry * decode(Slot * slot, ByteBuffer * bb) { +Entry *decode(Slot *slot, ByteBuffer *bb) { int keylength = bb->getInt(); - Array * key = new Array(keylength); + Array *key = new Array(keylength); bb->get(key); int64_t machineid = bb->getLong(); - + return new NewKey(slot, IoTString.shallow(key), machineid); } -void NewKey::encode(ByteBuffer * bb) { +void NewKey::encode(ByteBuffer *bb) { bb->put(TypeNewKey); bb->putInt(key->length()); bb->put(key->internalBytes()); diff --git a/version2/src/C/NewKey.h b/version2/src/C/NewKey.h index 53a8f8c..019d66c 100644 --- a/version2/src/C/NewKey.h +++ b/version2/src/C/NewKey.h @@ -9,29 +9,29 @@ class NewKey : public Entry { - private: - IoTString * key; +private: + IoTString *key; int64_t machineid; - public: - NewKey(Slot * slot, IoTString * _key, int64_t _machineid) : - Entry(slot), +public: + NewKey(Slot *slot, IoTString *_key, int64_t _machineid) : + Entry(slot), key(_key), machineid(_machineid) { - } - + } + int64_t getMachineID() { return machineid; } - IoTString * getKey() { return key; } - void setSlot(Slot * s) { parentslot = s; } - + IoTString *getKey() { return key; } + void setSlot(Slot *s) { parentslot = s; } + - void encode(ByteBuffer * bb); + void encode(ByteBuffer *bb); int getSize(); char getType() { return TypeNewKey; } - - Entry * getCopy(Slot * s) { return new NewKey(s, key, machineid); } + + Entry *getCopy(Slot *s) { return new NewKey(s, key, machineid); } }; -Entry * NewKey_decode(Slot *slot, ByteBuffer *bb); +Entry *NewKey_decode(Slot *slot, ByteBuffer *bb); diff --git a/version2/src/C/Pair.h b/version2/src/C/Pair.h index 127f9f8..192e93c 100644 --- a/version2/src/C/Pair.h +++ b/version2/src/C/Pair.h @@ -3,20 +3,20 @@ template class Pair { - private: - A a; - B b; - - public: +private: + A a; + B b; + +public: Pair(A _a, B _b) : - a(_a), - b(_b) { + a(_a), + b(_b) { } - + A getFirst() { return a; } - + B getSecond() { return b; } diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index 0d45dc2..5386815 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -1,8 +1,8 @@ #include "PendingTransaction.h" PendingTransaction::PendingTransaction(int64_t _machineId) : - keyValueUpdateSet(new Hashset()), - keyValueGuardSet(new HashSet()), + keyValueUpdateSet(new Hashset()), + keyValueGuardSet(new HashSet()), arbitrator(-1), clientLocalSequenceNumber(-1), machineId(_machineId), @@ -13,26 +13,26 @@ PendingTransaction::PendingTransaction(int64_t _machineId) : * Add a new key value to the updates * */ -void PendingTransaction::addKV(KeyValue * newKV) { +void PendingTransaction::addKV(KeyValue *newKV) { KeyValue rmKV = NULL; // Make sure there are no duplicates for (KeyValue kv : keyValueUpdateSet) { if (kv.getKey().equals(newKV.getKey())) { - + // Remove key if we are adding a newer version of the same key rmKV = kv; break; } } - + // Remove key if we are adding a newer version of the same key if (rmKV != NULL) { keyValueUpdateSet.remove(rmKV); currentDataSize -= rmKV.getSize(); } - + // Add the key to the hash set keyValueUpdateSet.add(newKV); currentDataSize += newKV.getSize(); @@ -42,7 +42,7 @@ void PendingTransaction::addKV(KeyValue * newKV) { * Add a new key value to the guard set * */ -void PendingTransaction::addKVGuard(KeyValue * newKV) { +void PendingTransaction::addKVGuard(KeyValue *newKV) { // Add the key to the hash set keyValueGuardSet.add(newKV); currentDataSize += newKV.getSize(); @@ -56,30 +56,30 @@ bool PendingTransaction::checkArbitrator(int64_t arb) { arbitrator = arb; return true; } - + return arb == arbitrator; } bool PendingTransaction::evaluateGuard(Hashtable keyValTableCommitted, Hashtable keyValTableSpeculative, Hashtable keyValTablePendingTransSpeculative) { for (KeyValue kvGuard : keyValueGuardSet) { - + // First check if the key is in the speculative table, this is the value of the latest assumption KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey()); - - + + if (kv == NULL) { // if it is not in the pending trans table then check the speculative table and use that // value as our latest assumption kv = keyValTableSpeculative.get(kvGuard.getKey()); } - - + + if (kv == NULL) { // if it is not in the speculative table then check the committed table and use that // value as our latest assumption kv = keyValTableCommitted.get(kvGuard.getKey()); } - + if (kvGuard.getValue() != NULL) { if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { return false; @@ -89,69 +89,69 @@ bool PendingTransaction::evaluateGuard(Hashtable keyVal return false; } } - } + } return true; } -Transaction * PendingTransaction::createTransaction() { - Transaction * newTransaction = new Transaction(); +Transaction *PendingTransaction::createTransaction() { + Transaction *newTransaction = new Transaction(); int transactionPartCount = 0; - + // Convert all the data into a char array so we can start partitioning - Array * charData = convertDataToBytes(); - + Array *charData = convertDataToBytes(); + int currentPosition = 0; int remaining = charData.length; - + while (remaining > 0) { - - Boolean isLastPart = false; + + bool isLastPart = false; // determine how much to copy int copySize = TransactionPart.MAX_NON_HEADER_SIZE; if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) { copySize = remaining; - isLastPart = true; // last bit of data so last part + isLastPart = true;// last bit of data so last part } - + // Copy to a smaller version char[] partData = new char[copySize]; System.arraycopy(charData, currentPosition, partData, 0, copySize); - + TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart); newTransaction.addPartEncode(part); - - // Update position, count and remaining + + // Update position, count and remaining currentPosition += copySize; transactionPartCount++; remaining -= copySize; } - + // Add the Guard Conditions for (KeyValue kv : keyValueGuardSet) { newTransaction.addGuardKV(kv); } - + // Add the updates for (KeyValue kv : keyValueUpdateSet) { newTransaction.addUpdateKV(kv); } - + return newTransaction; } -Arrar * PendingTransaction::convertDataToBytes() { +Arrar *PendingTransaction::convertDataToBytes() { // Calculate the size of the data - int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's + int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's sizeOfData += currentDataSize; - + // Data handlers and storage - Array * dataArray = new Array(sizeOfData); - ByteBuffer * bbEncode = ByteBuffer_wrap(dataArray); - + Array *dataArray = new Array(sizeOfData); + ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); + // Encode the size of the updates and guard sets bbEncode->putInt(keyValueGuardSet.size()); bbEncode->putInt(keyValueUpdateSet.size()); - + // Encode all the guard conditions for (KeyValue kv : keyValueGuardSet) { kv->encode(bbEncode); diff --git a/version2/src/C/PendingTransaction.h b/version2/src/C/PendingTransaction.h index 7e60ce8..e919ca8 100644 --- a/version2/src/C/PendingTransaction.h +++ b/version2/src/C/PendingTransaction.h @@ -4,15 +4,15 @@ #include "common.h" class PendingTransaction { - private: - Hashset * keyValueUpdateSet = NULL; - Hashset * keyValueGuardSet = NULL; +private: + Hashset *keyValueUpdateSet = NULL; + Hashset *keyValueGuardSet = NULL; int64_t arbitrator = -1; int64_t clientLocalSequenceNumber = -1; - int64_t machineId = -1; + int64_t machineId = -1; int32_T currentDataSize = 0; - public: +public: PendingTransaction(int64_t _machineId); /** * Add a new key value to the updates @@ -35,12 +35,12 @@ class PendingTransaction { /** * Get the key value update set */ - Hashset * getKVUpdates() { return keyValueUpdateSet; } + Hashset *getKVUpdates() { return keyValueUpdateSet; } /** * Get the key value update set */ - public Hashset * getKVGuard() { return keyValueGuardSet; } + public Hashset *getKVGuard() { return keyValueGuardSet; } void setClientLocalSequenceNumber(int64_t _clientLocalSequenceNumber) { clientLocalSequenceNumber = _clientLocalSequenceNumber; } @@ -50,8 +50,8 @@ class PendingTransaction { bool evaluateGuard(Hashtable keyValTableCommitted, Hashtable keyValTableSpeculative, Hashtable keyValTablePendingTransSpeculative); - Transaction * createTransaction(); + Transaction *createTransaction(); - Array * convertDataToBytes(); + Array *convertDataToBytes(); }; #endif diff --git a/version2/src/C/RejectedMessage.cc b/version2/src/C/RejectedMessage.cc index b1f441b..05d37f6 100644 --- a/version2/src/C/RejectedMessage.cc +++ b/version2/src/C/RejectedMessage.cc @@ -9,13 +9,13 @@ * @version 1.0 */ -Entry * RejectedMessage_decode(Slot * slot, ByteBuffer * bb) { - int64_t sequencenum=bb->getLong(); - int64_t machineid=bb->getLong(); - int64_t oldseqnum=bb->getLong(); - int64_t newseqnum=bb->getLong(); - char equalto=bb->get(); - return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto==1); +Entry *RejectedMessage_decode(Slot *slot, ByteBuffer *bb) { + int64_t sequencenum = bb->getLong(); + int64_t machineid = bb->getLong(); + int64_t oldseqnum = bb->getLong(); + int64_t newseqnum = bb->getLong(); + char equalto = bb->get(); + return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto == 1); } void RejectedMessage::removeWatcher(int64_t machineid) { @@ -24,11 +24,11 @@ void RejectedMessage::removeWatcher(int64_t machineid) { setDead(); } -void RejectedMessage::encode(ByteBuffer * bb) { +void RejectedMessage::encode(ByteBuffer *bb) { bb->put(TypeRejectedMessage); bb->putLong(sequencenum); bb->putLong(machineid); bb->putLong(oldseqnum); bb->putLong(newseqnum); - bb->put(equalto?(char)1:(char)0); + bb->put(equalto ? (char)1 : (char)0); } diff --git a/version2/src/C/RejectedMessage.h b/version2/src/C/RejectedMessage.h index f33e40f..5259f68 100644 --- a/version2/src/C/RejectedMessage.h +++ b/version2/src/C/RejectedMessage.h @@ -11,7 +11,7 @@ #include "Entry.h" class RejectedMessage : public Entry { - private: +private: /* Sequence number */ int64_t sequencenum; /* Machine identifier */ @@ -24,30 +24,30 @@ class RejectedMessage : public Entry { * equal to) the specified machine identifier. */ bool equalto; /* Set of machines that have not received notification. */ - Hashset * watchset; - - RejectedMessage(Slot * slot, int64_t _sequencenum, int64_t _machineid, int64_t _oldseqnum, int64_t _newseqnum, bool _equalto) : Entry(slot), + Hashset *watchset; + + RejectedMessage(Slot *slot, int64_t _sequencenum, int64_t _machineid, int64_t _oldseqnum, int64_t _newseqnum, bool _equalto) : Entry(slot), sequencenum(_sequencenum), machineid(_machineid), oldseqnum(_oldseqnum), newseqnum(_newseqnum), equalto(_equalto) { - } + } int64_t getOldSeqNum() { return oldseqnum; } int64_t getNewSeqNum() { return newseqnum; } bool getEqual() { return equalto; } int64_t getMachineID() { return machineid; } - int64_t getSequenceNumber() { return sequencenum; } - void setWatchSet(HashSet * _watchset) { watchset=_watchset; } + int64_t getSequenceNumber() { return sequencenum; } + void setWatchSet(HashSet *_watchset) { watchset = _watchset; } void removeWatcher(int64_t machineid); - void encode(ByteBuffer * bb); - int getSize() { return 4*sizeof(int64_t) + 2*sizeof(char); } + void encode(ByteBuffer *bb); + int getSize() { return 4 * sizeof(int64_t) + 2 * sizeof(char); } char getType() { return TypeRejectedMessage; } - Entry * getCopy(Slot * s) { + Entry *getCopy(Slot *s) { return new RejectedMessage(s,sequencenum, machineid, oldseqnum, newseqnum, equalto); } }; -Entry * RejectedMessage_decode(Slot * slot, ByteBuffer * bb); +Entry *RejectedMessage_decode(Slot *slot, ByteBuffer *bb); #ifndef diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 502c6ca..982b87b 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -1,44 +1,44 @@ #include "Slot.h" -Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, char* _hmac, int64_t _localSequenceNumber) { - seqnum = _seqnum; - machineid = _machineid; - prevhmac = _prevhmac; - hmac = _hmac; - entries = new Vector(); - livecount = 1; - seqnumlive = true; - freespace = SLOT_SIZE - getBaseSize(); - table = _table; - localSequenceNumber = _localSequenceNumber; +Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber) { + seqnum = _seqnum; + machineid = _machineid; + prevhmac = _prevhmac; + hmac = _hmac; + entries = new Vector(); + livecount = 1; + seqnumlive = true; + freespace = SLOT_SIZE - getBaseSize(); + table = _table; + localSequenceNumber = _localSequenceNumber; } -Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, int64_t _localSequenceNumber) { - this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber); +Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber) { + this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber); } Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) { - this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber); + this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber); } -Entry * Slot::addEntry(Entry * e) { - e = e->getCopy(this); - entries->add(e); - livecount++; - freespace -= e->getSize(); - return e; +Entry *Slot::addEntry(Entry *e) { + e = e->getCopy(this); + entries->add(e); + livecount++; + freespace -= e->getSize(); + return e; } void Slot::removeEntry(Entry *e) { - entries->remove(e); - livecount--; - freespace += e->getSize(); + entries->remove(e); + livecount--; + freespace += e->getSize(); } void Slot::addShallowEntry(Entry *e) { - entries->add(e); - livecount++; - freespace -= e->getSize(); + entries->add(e); + livecount++; + freespace -= e->getSize(); } /** @@ -46,57 +46,57 @@ void Slot::addShallowEntry(Entry *e) { * using its reserved space. */ bool Slot::hasSpace(Entry *e) { - int newfreespace = freespace - e->getSize(); - return newfreespace >= 0; + int newfreespace = freespace - e->getSize(); + return newfreespace >= 0; } -Vector * Slot::getEntries() { - return entries; +Vector *Slot::getEntries() { + return entries; } -Slot * Slotdecode(Table * table, char* array, Mac * mac) { - mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE); - char* realmac = mac->doFinal(); - - ByteBuffer * bb = ByteBuffer_wrap(array); - char* hmac = new char[HMAC_SIZE]; - char* prevhmac = new char[HMAC_SIZE]; - bb->get(hmac); - bb->get(prevhmac); - if (!Arrays.equals(realmac, hmac)) - throw new Error("Server Error: Invalid HMAC! Potential Attack!"); - - int64_t seqnum = bb->getLong(); - int64_t machineid = bb->getLong(); - int numentries = bb->getInt(); - Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1); - - for (int i = 0; i < numentries; i++) { - slot->addShallowEntry(Entry->decode(slot, bb)); - } - - return slot; +Slot *Slotdecode(Table *table, char *array, Mac *mac) { + mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE); + char *realmac = mac->doFinal(); + + ByteBuffer *bb = ByteBuffer_wrap(array); + char *hmac = new char[HMAC_SIZE]; + char *prevhmac = new char[HMAC_SIZE]; + bb->get(hmac); + bb->get(prevhmac); + if (!Arrays.equals(realmac, hmac)) + throw new Error("Server Error: Invalid HMAC! Potential Attack!"); + + int64_t seqnum = bb->getLong(); + int64_t machineid = bb->getLong(); + int numentries = bb->getInt(); + Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1); + + for (int i = 0; i < numentries; i++) { + slot->addShallowEntry(Entry->decode(slot, bb)); + } + + return slot; } -char* Slot::encode(Mac mac) { - char* array = new char[SLOT_SIZE]; - ByteBuffer * bb = ByteBuffer_wrap(array); - /* Leave space for the slot HMAC. */ - bb->position(HMAC_SIZE); - bb->put(prevhmac); - bb->putLong(seqnum); - bb->putLong(machineid); - bb->putInt(entries->size()); - for (Entry entry : entries) { - entry->encode(bb); - } - /* Compute our HMAC */ - mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE); - char* realmac = mac->doFinal(); - hmac = realmac; - bb->position(0); - bb->put(realmac); - return array; +char *Slot::encode(Mac mac) { + char *array = new char[SLOT_SIZE]; + ByteBuffer *bb = ByteBuffer_wrap(array); + /* Leave space for the slot HMAC. */ + bb->position(HMAC_SIZE); + bb->put(prevhmac); + bb->putLong(seqnum); + bb->putLong(machineid); + bb->putInt(entries->size()); + for (Entry entry : entries) { + entry->encode(bb); + } + /* Compute our HMAC */ + mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE); + char *realmac = mac->doFinal(); + hmac = realmac; + bb->position(0); + bb->put(realmac); + return array; } @@ -106,19 +106,19 @@ char* Slot::encode(Mac mac) { * itself. */ -Vector *Slot::getLiveEntries(bool resize) { - Vector *liveEntries = new Vector(); - for (Entry *entry : entries) { - if (entry->isLive()) { - if (!resize || entry->getType() != Entry->TypeTableStatus) - liveEntries->add(entry); - } - } - - if (seqnumlive && !resize) - liveEntries->add(new LastMessage(this, machineid, seqnum)); - - return liveEntries; +Vector *Slot::getLiveEntries(bool resize) { + Vector *liveEntries = new Vector(); + for (Entry *entry : entries) { + if (entry->isLive()) { + if (!resize || entry->getType() != Entry->TypeTableStatus) + liveEntries->add(entry); + } + } + + if (seqnumlive && !resize) + liveEntries->add(new LastMessage(this, machineid, seqnum)); + + return liveEntries; } @@ -128,8 +128,8 @@ Vector *Slot::getLiveEntries(bool resize) { */ void Slot::setDead() { - seqnumlive = false; - decrementLiveCount(); + seqnumlive = false; + decrementLiveCount(); } /** @@ -137,16 +137,16 @@ void Slot::setDead() { */ void Slot::decrementLiveCount() { - livecount--; - if (livecount == 0) { - table->decrementLiveCount(); - } + livecount--; + if (livecount == 0) { + table->decrementLiveCount(); + } } -char* Slot::getSlotCryptIV() { - ByteBuffer * buffer = ByteBuffer_allocate(CloudComm.IV_SIZE); - buffer->putLong(machineid); - int64_t localSequenceNumberShift = localSequenceNumber << 16; - buffer->putLong(localSequenceNumberShift); - return buffer->array(); +char *Slot::getSlotCryptIV() { + ByteBuffer *buffer = ByteBuffer_allocate(CloudComm.IV_SIZE); + buffer->putLong(machineid); + int64_t localSequenceNumberShift = localSequenceNumber << 16; + buffer->putLong(localSequenceNumberShift); + return buffer->array(); } diff --git a/version2/src/C/Slot.h b/version2/src/C/Slot.h index f42aa68..1138fbf 100644 --- a/version2/src/C/Slot.h +++ b/version2/src/C/Slot.h @@ -5,51 +5,51 @@ #define HMAC_SIZE 32 class Slot : public Liveness { - private: +private: /** Sequence number of the slot. */ int64_t seqnum; /** HMAC of previous slot. */ - char* prevhmac; + char *prevhmac; /** HMAC of this slot. */ - char* hmac; + char *hmac; /** Machine that sent this slot. */ - int64_t machineid; + int64_t machineid; /** Vector of entries in this slot. */ - Vector * entries; + Vector *entries; /** Pieces of information that are live. */ int livecount; /** Flag that indicates whether this slot is still live for * recording the machine that sent it. */ - bool seqnumlive; + bool seqnumlive; /** Number of chars of free space. */ int freespace; /** Reference to Table */ - Table * table; + Table *table; int64_t localSequenceNumber; - void addShallowEntry(Entry * e); - - public: - Slot(Table * _table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, char* _hmac, int64_t _localSequenceNumber); - Slot(Table _table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, int64_t _localSequenceNumber); + void addShallowEntry(Entry *e); + +public: + Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber); + Slot(Table _table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber); Slot(Table _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber); - char* getHMAC() { return hmac; } - char* getPrevHMAC() { return prevhmac; } - Entry * addEntry(Entry * e); - void removeEntry(Entry * e); - bool hasSpace(Entry * e); - Vector * getEntries(); - char* encode(Mac * mac); + char *getHMAC() { return hmac; } + char *getPrevHMAC() { return prevhmac; } + Entry *addEntry(Entry *e); + void removeEntry(Entry *e); + bool hasSpace(Entry *e); + Vector *getEntries(); + char *encode(Mac *mac); int getBaseSize() { return 2 * HMAC_SIZE + 2 * sizeof(int64_t) + sizeof(int); } - Vector * getLiveEntries(bool resize); + Vector *getLiveEntries(bool resize); int64_t getSequenceNumber() { return seqnum; } int64_t getMachineID() { return machineid; } void setDead(); void decrementLiveCount(); bool isLive() { return livecount > 0; } - char* getSlotCryptIV(); + char *getSlotCryptIV(); }; -Slot * Slotdecode(Table * table, char* array, Mac *mac); +Slot *Slotdecode(Table *table, char *array, Mac *mac); #endif diff --git a/version2/src/C/SlotBuffer.cc b/version2/src/C/SlotBuffer.cc index 91ac22e..4ddf4ba 100644 --- a/version2/src/C/SlotBuffer.cc +++ b/version2/src/C/SlotBuffer.cc @@ -1,4 +1,4 @@ -#include"SlotBuffer.h" +#include "SlotBuffer.h" /** * Circular buffer that holds the live set of slots. * @author Brian Demsky @@ -25,8 +25,8 @@ int SlotBuffer::capacity() { void SlotBuffer::resize(int newsize) { if (newsize == (array->length() - 1)) return; - - Array * newarray = new Array(newsize + 1); + + Array *newarray = new Array(newsize + 1); int currsize = size(); int index = tail; for (int i = 0; i < currsize; i++) { @@ -51,9 +51,9 @@ void SlotBuffer::incrementTail() { tail = 0; } -void SlotBuffer::putSlot(Slot * s) { +void SlotBuffer::putSlot(Slot *s) { int64_t checkNum = (getNewestSeqNum() + 1); - + if (checkNum != s->getSequenceNumber()) { // We have a gap so expunge all our slots oldestseqn = s->getSequenceNumber(); @@ -62,10 +62,10 @@ void SlotBuffer::putSlot(Slot * s) { array->set(0, s); return; } - + array->set(head, s); incrementHead(); - + if (oldestseqn == 0) { oldestseqn = s->getSequenceNumber(); } diff --git a/version2/src/C/SlotBuffer.h b/version2/src/C/SlotBuffer.h index ecd2b97..7027de6 100644 --- a/version2/src/C/SlotBuffer.h +++ b/version2/src/C/SlotBuffer.h @@ -1,7 +1,7 @@ #ifndef SLOTBUFFER_H #define SLOTBUFFER_H -#include"common.h" +#include "common.h" /** * Circular buffer that holds the live set of slots. @@ -12,14 +12,14 @@ #define SlotBuffer_DEFAULT_SIZE 16 class SlotBuffer { - private: - Array * array; +private: + Array *array; int32_t head; int32_t tail; void incrementHead(); void incrementTail(); - - public: + +public: int64_t oldestseqn; SlotBuffer(); @@ -27,7 +27,7 @@ class SlotBuffer { int32_t capacity(); void resize(int newsize); void putSlot(Slot *s); - Slot * getSlot(int64_t seqnum); + Slot *getSlot(int64_t seqnum); int64_t getOldestSeqNum() { return oldestseqn; } int64_t getNewestSeqNum() { return oldestseqn + size() - 1;} }; diff --git a/version2/src/C/SlotIndexer.cc b/version2/src/C/SlotIndexer.cc index 0fa6449..2b4cdf4 100644 --- a/version2/src/C/SlotIndexer.cc +++ b/version2/src/C/SlotIndexer.cc @@ -7,13 +7,13 @@ * @version 1.0 */ -SlotIndexer::SlotIndexer(Array * _updates, SlotBuffer * _buffer) : +SlotIndexer::SlotIndexer(Array *_updates, SlotBuffer *_buffer) : buffer(_buffer), updates(_updates), firstslotseqnum(updates->get(0)->getSequenceNumber()) { } -Slot * SlotIndexer::getSlot(int64_t seqnum) { +Slot *SlotIndexer::getSlot(int64_t seqnum) { if (seqnum >= firstslotseqnum) { int32_t offset = (int32_t) (seqnum - firstslotseqnum); if (offset >= updates->length()) diff --git a/version2/src/C/SlotIndexer.h b/version2/src/C/SlotIndexer.h index c4f54eb..d1004d3 100644 --- a/version2/src/C/SlotIndexer.h +++ b/version2/src/C/SlotIndexer.h @@ -9,13 +9,13 @@ */ class SlotIndexer { - private: - Array * updates; - SlotBuffer * buffer; +private: + Array *updates; + SlotBuffer *buffer; int64_t firstslotseqnum; - public: - SlotIndexer(Array * _updates, SlotBuffer * _buffer); - Slot* getSlot(int64_t seqnum); +public: + SlotIndexer(Array *_updates, SlotBuffer *_buffer); + Slot *getSlot(int64_t seqnum); }; #endif; diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 508609e..b01cc0e 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -9,7 +9,7 @@ final class Table { /* Constants */ - static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10 + static final int FREE_SLOTS = 2;// Number of slots that should be kept free // 10 static final int SKIP_THRESHOLD = 10; static final double RESIZE_MULTIPLE = 1.2; static final double RESIZE_THRESHOLD = 0.75; @@ -20,24 +20,24 @@ final class Table { CloudComm cloud = NULL; Random random = NULL; TableStatus liveTableStatus = NULL; - PendingTransaction pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction - Transaction lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction - Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list + PendingTransaction pendingTransactionBuilder = NULL;// Pending Transaction used in building a Pending Transaction + Transaction lastPendingTransactionSpeculatedOn = NULL;// Last transaction that was speculated on from the pending transaction + Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list /* Variables */ - int numberOfSlots = 0; // Number of slots stored in buffer - int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed - int64_t liveSlotCount = 0; // Number of currently live slots + int numberOfSlots = 0; // Number of slots stored in buffer + int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed + int64_t liveSlotCount = 0;// Number of currently live slots int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry - int64_t localMachineId = 0; // Machine ID of this client device - int64_t sequenceNumber = 0; // Largest sequence number a client has received + int64_t localMachineId = 0; // Machine ID of this client device + int64_t sequenceNumber = 0; // Largest sequence number a client has received int64_t localSequenceNumber = 0; // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server - int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions - int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on - int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on + int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions + int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on + int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on int64_t localArbitrationSequenceNumber = 0; bool hadPartialSendToServer = false; bool attemptedToSendToServer = false; @@ -48,37 +48,37 @@ final class Table { Slot lastSlotAttemptedToSend = NULL; bool lastIsNewKey = false; int lastNewSize = 0; - Hashtable> lastTransactionPartsSent = NULL; + Hashtable > lastTransactionPartsSent = NULL; Vector lastPendingSendArbitrationEntriesToDelete = NULL; NewKey lastNewKey = NULL; /* Data Structures */ - Hashtable committedKeyValueTable = NULL; // Table of committed key value pairs - Hashtable speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value - Hashtable pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions - Hashtable liveNewKeyTable = NULL; // Table of live new keys - Hashtable> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); - Hashtable> rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet - Hashtable arbitratorTable = NULL; // Table of keys and their arbitrators - Hashtable, Abort> liveAbortTable = NULL; // Table live abort messages - Hashtable, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server - Hashtable, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server - Hashtable lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on - Hashtable liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number - Hashtable, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID - Hashtable> liveCommitsTable = NULL; + Hashtable committedKeyValueTable = NULL; // Table of committed key value pairs + Hashtable speculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value + Hashtable pendingTransactionSpeculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value from the pending transactions + Hashtable liveNewKeyTable = NULL;// Table of live new keys + Hashtable > lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); + Hashtable > rejectedMessageWatchVectorTable = NULL;// Table of machine Ids and the set of rejected messages they have not seen yet + Hashtable arbitratorTable = NULL;// Table of keys and their arbitrators + Hashtable, Abort> liveAbortTable = NULL; // Table live abort messages + Hashtable, TransactionPart> > newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server + Hashtable, CommitPart> > newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server + Hashtable lastArbitratedTransactionNumberByArbitratorTable = NULL;// Last transaction sequence number that an arbitrator arbitrated on + Hashtable liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number + Hashtable, Transaction> liveTransactionByTransactionIdTable = NULL;// live transaction grouped by the transaction ID + Hashtable > liveCommitsTable = NULL; Hashtable liveCommitsByKeyTable = NULL; Hashtable lastCommitSeenSequenceNumberByArbitratorTable = NULL; - Vector rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server + Vector rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server Vector pendingTransactionQueue = NULL; Vector pendingSendArbitrationRounds = NULL; Vector pendingSendArbitrationEntriesToDelete = NULL; - Hashtable> transactionPartsSent = NULL; + Hashtable > transactionPartsSent = NULL; Hashtable outstandingTransactionStatus = NULL; Hashtable liveAbortsGeneratedByLocal = NULL; - Set> offlineTransactionsCommittedAndAtServer = NULL; - Hashtable> localCommunicationTable = NULL; + Set > offlineTransactionsCommittedAndAtServer = NULL; + Hashtable > localCommunicationTable = NULL; Hashtable lastTransactionSeenFromMachineFromServer = NULL; Hashtable lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL; @@ -114,26 +114,26 @@ final class Table { speculatedKeyValueTable = new Hashtable(); pendingTransactionSpeculatedKeyValueTable = new Hashtable(); liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable>(); - rejectedMessageWatchVectorTable = new Hashtable>(); + lastMessageTable = new Hashtable >(); + rejectedMessageWatchVectorTable = new Hashtable >(); arbitratorTable = new Hashtable(); liveAbortTable = new Hashtable, Abort>(); - newTransactionParts = new Hashtable, TransactionPart>>(); - newCommitParts = new Hashtable, CommitPart>>(); + newTransactionParts = new Hashtable, TransactionPart> >(); + newCommitParts = new Hashtable, CommitPart> >(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); liveTransactionByTransactionIdTable = new Hashtable, Transaction>(); - liveCommitsTable = new Hashtable>(); + liveCommitsTable = new Hashtable >(); liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); pendingTransactionQueue = new Vector(); pendingSendArbitrationEntriesToDelete = new Vector(); - transactionPartsSent = new Hashtable>(); + transactionPartsSent = new Hashtable >(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new HashSet>(); - localCommunicationTable = new Hashtable>(); + offlineTransactionsCommittedAndAtServer = new HashSet >(); + localCommunicationTable = new Hashtable >(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); @@ -262,17 +262,17 @@ final class Table { } // String toString() { - // String retString = " Committed Table: \n"; - // retString += "---------------------------\n"; - // retString += commitedTable.toString(); + // String retString = " Committed Table: \n"; + // retString += "---------------------------\n"; + // retString += commitedTable.toString(); - // retString += "\n\n"; + // retString += "\n\n"; - // retString += " Speculative Table: \n"; - // retString += "---------------------------\n"; - // retString += speculativeTable.toString(); + // retString += " Speculative Table: \n"; + // retString += "---------------------------\n"; + // retString += speculativeTable.toString(); - // return retString; + // return retString; // } synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) { @@ -469,7 +469,7 @@ final class Table { continue; } - Pair sendReturn = sendTransactionToLocal(transaction); + Pair sendReturn = sendTransactionToLocal(transaction); if (sendReturn.getFirst()) { // Failed to contact over local @@ -528,7 +528,7 @@ final class Table { Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); if (newSlots.length == 0) { fromRetry = true; - ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); if (sendSlotsReturn.getFirst()) { if (newKey != NULL) { @@ -732,10 +732,10 @@ final class Table { localSequenceNumber++; // Try to fill the slot with data - ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); + ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); bool needsResize = fillSlotsReturn.getFirst(); int newSize = fillSlotsReturn.getSecond(); - Boolean insertedNewKey = fillSlotsReturn.getThird(); + bool insertedNewKey = fillSlotsReturn.getThird(); if (needsResize) { // Reset which transaction to send @@ -761,11 +761,11 @@ final class Table { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable>(transactionPartsSent); + lastTransactionPartsSent = new Hashtable >(transactionPartsSent); lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); + ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); if (sendSlotsReturn.getFirst()) { @@ -810,32 +810,32 @@ final class Table { } else { // if (!sendSlotsReturn.getSecond()) { - // for (Transaction transaction : lastTransactionPartsSent.keySet()) { - // transaction.resetServerFailure(); - // } + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + // } // } else { - // for (Transaction transaction : lastTransactionPartsSent.keySet()) { - // transaction.resetServerFailure(); + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); - // // Update which transactions parts still need to be sent - // transaction.removeSentParts(transactionPartsSent.get(transaction)); + // // Update which transactions parts still need to be sent + // transaction.removeSentParts(transactionPartsSent.get(transaction)); - // // Add the transaction status to the outstanding list - // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + // // Add the transaction status to the outstanding list + // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); - // // Update the transaction status - // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + // // Update the transaction status + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); - // // Check if all the transaction parts were successfully sent and if so then remove it from pending - // if (transaction.didSendAllParts()) { - // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - // pendingTransactionQueue.remove(transaction); + // // Check if all the transaction parts were successfully sent and if so then remove it from pending + // if (transaction.didSendAllParts()) { + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + // pendingTransactionQueue.remove(transaction); - // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); - // } - // } - // } + // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); + // } + // } + // } // } // Reset which transaction to send @@ -880,8 +880,8 @@ final class Table { // if (!fromRetry) { - // lastTransactionPartsSent = new Hashtable>(transactionPartsSent); - // lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + // lastTransactionPartsSent = new Hashtable>(transactionPartsSent); + // lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); // } // Nothing was able to be sent to the server so just clear these data structures @@ -910,7 +910,7 @@ final class Table { // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1; + Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); } @@ -951,14 +951,14 @@ final class Table { return true; } - Pair sendTransactionToLocal(Transaction transaction) { + Pair sendTransactionToLocal(Transaction transaction) { // Get the devices local communications Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing - return new Pair(true, false); + return new Pair(true, false); } // Get the size of the send data @@ -967,7 +967,7 @@ final class Table { sendDataSize += part.getSize(); } - Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1; + Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); } @@ -990,7 +990,7 @@ final class Table { if (returnData == NULL) { // Could not contact server - return new Pair(true, false); + return new Pair(true, false); } // Decode the data @@ -1034,7 +1034,7 @@ final class Table { } } - return new Pair(false, true); + return new Pair(false, true); } synchronized char[] acceptDataFromLocal(char[] data) { @@ -1059,7 +1059,7 @@ final class Table { } // Arbitrate on transaction and pull relevant return data - Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); + Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); couldArbitrate = localArbitrateReturn.getFirst(); didCommit = localArbitrateReturn.getSecond(); @@ -1112,7 +1112,7 @@ final class Table { // Number of arbitration entries to decode returnDataSize += 2 * sizeof(int32_t); - // Boolean of did commit or not + // bool of did commit or not if (numberOfParts != 0) { returnDataSize += sizeof(char); } @@ -1144,7 +1144,7 @@ final class Table { return returnData; } - ThreeTuple sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException { + ThreeTuple sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException { bool attemptedToSendToServerTmp = attemptedToSendToServer; attemptedToSendToServer = true; @@ -1157,7 +1157,7 @@ final class Table { array = new Slot[] {slot}; rejectedSlotVector.clear(); inserted = true; - } else { + } else { if (array.length == 0) { throw new Error("Server Error: Did not send any slots"); } @@ -1204,18 +1204,18 @@ final class Table { } } - return new ThreeTuple(inserted, lastTryInserted, array); + return new ThreeTuple(inserted, lastTryInserted, array); } /** * Returns false if a resize was needed */ - ThreeTuple fillSlot(Slot slot, bool resize, NewKey newKeyEntry) { + ThreeTuple fillSlot(Slot slot, bool resize, NewKey newKeyEntry) { int newSize = 0; if (liveSlotCount > bufferResizeThreshold) { - resize = true; //Resize is forced + resize = true;//Resize is forced } @@ -1229,7 +1229,7 @@ final class Table { doRejectedMessages(slot); // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); + ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); // Extract working variables bool needsResize = mandatoryRescueReturn.getFirst(); @@ -1238,7 +1238,7 @@ final class Table { if (needsResize && !resize) { // We need to resize but we are not resizing so return false - return new ThreeTuple(true, NULL, NULL); + return new ThreeTuple(true, NULL, NULL); } bool inserted = false; @@ -1290,7 +1290,7 @@ final class Table { // Set the transaction sequence number if it has yet to be inserted into the block chain // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { - // transaction.setSequenceNumber(slot.getSequenceNumber()); + // transaction.setSequenceNumber(slot.getSequenceNumber()); // } if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { @@ -1324,11 +1324,11 @@ final class Table { // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - return new ThreeTuple(false, newSize, inserted); + return new ThreeTuple(false, newSize, inserted); } void doRejectedMessages(Slot s) { - if (! rejectedSlotVector.isEmpty()) { + if (!rejectedSlotVector.isEmpty()) { /* TODO: We should avoid generating a rejected message entry if * there is already a sufficient entry in the queue (e.g., * equalsto value of true and same sequence number). */ @@ -1366,7 +1366,7 @@ final class Table { } } - ThreeTuple doMandatoryResuce(Slot slot, bool resize) { + ThreeTuple doMandatoryResuce(Slot slot, bool resize) { int64_t newestSequenceNumber = buffer.getNewestSeqNum(); int64_t oldestSequenceNumber = buffer.getOldestSeqNum(); if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { @@ -1383,7 +1383,7 @@ final class Table { for (; currentSequenceNumber < threshold; currentSequenceNumber++) { Slot previousSlot = buffer.getSlot(currentSequenceNumber); // Push slot number forward - if (! seenLiveSlot) { + if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; } @@ -1405,15 +1405,15 @@ final class Table { slot.addEntry(liveEntry); } else if (currentSequenceNumber == firstIfFull) { //if there's no space but the entry is about to fall off the queue - System.out.println("B"); //? - return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); + System.out.println("B");//? + return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); } } } // Did not resize - return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); + return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); } void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) { @@ -1422,7 +1422,7 @@ final class Table { * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer.getNewestSeqNum(); - search: +search: for (; seqn <= newestseqnum; seqn++) { Slot prevslot = buffer.getSlot(seqn); //Push slot number forward @@ -1733,11 +1733,11 @@ final class Table { // Create the abort Abort newAbort = new Abort(NULL, - transaction.getClientLocalSequenceNumber(), - transaction.getSequenceNumber(), - transaction.getMachineId(), - transaction.getArbitrator(), - localArbitrationSequenceNumber); + transaction.getClientLocalSequenceNumber(), + transaction.getSequenceNumber(), + transaction.getMachineId(), + transaction.getArbitrator(), + localArbitrationSequenceNumber); localArbitrationSequenceNumber++; generatedAborts.add(newAbort); @@ -1791,17 +1791,17 @@ final class Table { } } - Pair arbitrateOnLocalTransaction(Transaction transaction) { + Pair arbitrateOnLocalTransaction(Transaction transaction) { // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { - return new Pair(false, false); + return new Pair(false, false); } if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this - return new Pair(false, false); + return new Pair(false, false); } if (transaction.getMachineId() != localMachineId) { @@ -1809,7 +1809,7 @@ final class Table { if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) { if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { // We've have already seen this from the server - return new Pair(false, false); + return new Pair(false, false); } } } @@ -1853,7 +1853,7 @@ final class Table { } updateLiveStateFromLocal(); - return new Pair(true, true); + return new Pair(true, true); } else { if (transaction.getMachineId() == localMachineId) { @@ -1870,11 +1870,11 @@ final class Table { // Create the abort Abort newAbort = new Abort(NULL, - transaction.getClientLocalSequenceNumber(), - -1, - transaction.getMachineId(), - transaction.getArbitrator(), - localArbitrationSequenceNumber); + transaction.getClientLocalSequenceNumber(), + -1, + transaction.getMachineId(), + transaction.getArbitrator(), + localArbitrationSequenceNumber); localArbitrationSequenceNumber++; addAbortSet.add(newAbort); @@ -1893,7 +1893,7 @@ final class Table { } updateLiveStateFromLocal(); - return new Pair(true, false); + return new Pair(true, false); } } @@ -1985,7 +1985,7 @@ final class Table { return false; } // bool compactArbitrationData() { - // return false; + // return false; // } /** @@ -2117,7 +2117,7 @@ final class Table { for (KeyValue kv : commit.getKeyValueUpdateSet()) { commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); } - commitsToEdit.remove(NULL); // remove NULL since it could be in this set + commitsToEdit.remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated for (Commit previousCommit : commitsToEdit) { @@ -2195,7 +2195,7 @@ final class Table { if (startIndex >= transactionSequenceNumbersSorted.size()) { // Make sure we are not out of bounds - return false; // did not speculate + return false; // did not speculate } Set incompleteTransactionArbitrator = new HashSet(); @@ -2282,7 +2282,7 @@ final class Table { void updateLiveTransactionsAndStatus() { // Go through each of the transactions - for (Iterator> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) { + for (Iterator > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) { Transaction transaction = iter.next().getValue(); // Check if the transaction is dead @@ -2299,7 +2299,7 @@ final class Table { } // Go through each of the transactions - for (Iterator> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) { + for (Iterator > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) { TransactionStatus status = iter.next().getValue(); // Check if the transaction is dead @@ -2435,7 +2435,7 @@ final class Table { // Create a list of clients to watch until they see this rejected message entry. HashSet deviceWatchSet = new HashSet(); - for (Map.Entry> lastMessageEntry : lastMessageTable.entrySet()) { + for (Map.Entry > lastMessageEntry : lastMessageTable.entrySet()) { // Machine ID for the last message entry int64_t lastMessageEntryMachineId = lastMessageEntry.getKey(); @@ -2486,7 +2486,7 @@ final class Table { // Abort has not been seen by the client it is for yet so we need to keep track of it Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); if (previouslySeenAbort != NULL) { - previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version + previouslySeenAbort.setDead();// Delete old version of the abort since we got a rescued newer version } if (entry.getTransactionArbitrator() == localMachineId) { @@ -2637,7 +2637,7 @@ final class Table { } // Set dead the abort - for (Iterator, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) { + for (Iterator, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) { Abort abort = i.next().getValue(); if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { @@ -2728,7 +2728,7 @@ final class Table { Slot currSlot = newSlots[i]; Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1); if (prevSlot != NULL && - !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC())) + !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC())) throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); } } diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index b353fdf..3bc9cb9 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -7,7 +7,7 @@ * @version 1.0 */ - /* Constants */ +/* Constants */ #define Table_FREE_SLOTS 2 // Number of slots that should be kept free // 10 #define Table_SKIP_THRESHOLD 10 @@ -16,254 +16,254 @@ #define Table_REJECTED_THRESHOLD 5 class Table { - private: +private: /* Helper Objects */ - SlotBuffer * buffer; - CloudComm * cloud = NULL; - Random * random = NULL; - TableStatus * liveTableStatus = NULL; - PendingTransaction * pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction - Transaction * lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction - Transaction * firstPendingTransaction = NULL; // first transaction in the pending transaction list - + SlotBuffer *buffer; + CloudComm *cloud = NULL; + Random *random = NULL; + TableStatus *liveTableStatus = NULL; + PendingTransaction *pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction + Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction + Transaction *firstPendingTransaction = NULL; // first transaction in the pending transaction list + /* Variables */ - int numberOfSlots = 0; // Number of slots stored in buffer - int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed - int64_t liveSlotCount = 0; // Number of currently live slots + int numberOfSlots = 0; // Number of slots stored in buffer + int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed + int64_t liveSlotCount = 0;// Number of currently live slots int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry - int64_t localMachineId = 0; // Machine ID of this client device - int64_t sequenceNumber = 0; // Largest sequence number a client has received + int64_t localMachineId = 0; // Machine ID of this client device + int64_t sequenceNumber = 0; // Largest sequence number a client has received int64_t localSequenceNumber = 0; - + // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server - int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions - int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on - int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on + int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions + int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on + int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on int64_t localArbitrationSequenceNumber = 0; bool hadPartialSendToServer = false; bool attemptedToSendToServer = false; int64_t expectedsize; bool didFindTableStatus = false; int64_t currMaxSize = 0; - - Slot * lastSlotAttemptedToSend = NULL; + + Slot *lastSlotAttemptedToSend = NULL; bool lastIsNewKey = false; int lastNewSize = 0; Hashtable *> lastTransactionPartsSent = NULL; - Vector *lastPendingSendArbitrationEntriesToDelete = NULL; - NewKey * lastNewKey = NULL; - - + Vector *lastPendingSendArbitrationEntriesToDelete = NULL; + NewKey *lastNewKey = NULL; + + /* Data Structures */ - Hashtable *committedKeyValueTable = NULL; // Table of committed key value pairs - Hashtable * speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value - Hashtable * pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions - Hashtable * liveNewKeyTable = NULL; // Table of live new keys - Hashtable*> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); - Hashtable*> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet - Hashtable *arbitratorTable = NULL; // Table of keys and their arbitrators - Hashtable*, Abort*> *liveAbortTable = NULL; // Table live abort messages - Hashtable*, TransactionPart*>*> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server - Hashtable*, CommitPart*>*> * newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server - Hashtable *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on - Hashtable *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number - Hashtable*, Transaction*> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID - Hashtable> *liveCommitsTable = NULL; - Hashtable *liveCommitsByKeyTable = NULL; + Hashtable *committedKeyValueTable = NULL;// Table of committed key value pairs + Hashtable *speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value + Hashtable *pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions + Hashtable *liveNewKeyTable = NULL; // Table of live new keys + Hashtable *> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); + Hashtable *> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet + Hashtable *arbitratorTable = NULL;// Table of keys and their arbitrators + Hashtable *, Abort *> *liveAbortTable = NULL;// Table live abort messages + Hashtable *, TransactionPart *> *> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server + Hashtable *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server + Hashtable *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on + Hashtable *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number + Hashtable *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID + Hashtable > *liveCommitsTable = NULL; + Hashtable *liveCommitsByKeyTable = NULL; Hashtable *lastCommitSeenSequenceNumberByArbitratorTable = NULL; - Vector * rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server - Vector *pendingTransactionQueue = NULL; - Vector *pendingSendArbitrationRounds = NULL; - Vector *pendingSendArbitrationEntriesToDelete = NULL; - Hashtable*> *transactionPartsSent = NULL; - Hashtable *outstandingTransactionStatus = NULL; - Hashtable *liveAbortsGeneratedByLocal = NULL; - Hashset*> *offlineTransactionsCommittedAndAtServer = NULL; - Hashtable> * localCommunicationTable = NULL; - Hashtable * lastTransactionSeenFromMachineFromServer = NULL; - Hashtable * lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL; + Vector *rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server + Vector *pendingTransactionQueue = NULL; + Vector *pendingSendArbitrationRounds = NULL; + Vector *pendingSendArbitrationEntriesToDelete = NULL; + Hashtable *> *transactionPartsSent = NULL; + Hashtable *outstandingTransactionStatus = NULL; + Hashtable *liveAbortsGeneratedByLocal = NULL; + Hashset *> *offlineTransactionsCommittedAndAtServer = NULL; + Hashtable > *localCommunicationTable = NULL; + Hashtable *lastTransactionSeenFromMachineFromServer = NULL; + Hashtable *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL; void init(); - /** - * Recalculate the new resize threshold - */ + /** + * Recalculate the new resize threshold + */ void setResizeThreshold(); bool sendToServer(NewKey *newKey); synchronized bool updateFromLocal(int64_t machineId); - Pair sendTransactionToLocal(Transaction *transaction); - ThreeTuple *> * sendSlotsToServer(Slot *slot, int newSize, bool isNewKey); + Pair sendTransactionToLocal(Transaction *transaction); + ThreeTuple *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey); /** * Returns false if a resize was needed */ - ThreeTuple * fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry); + ThreeTuple *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry); void doRejectedMessages(Slot s); - - ThreeTuple doMandatoryResuce(Slot slot, bool resize); - - void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize); - /** + + ThreeTuple doMandatoryResuce(Slot slot, bool resize); + + void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize); + /** * Checks for malicious activity and updates the local copy of the block chain. */ - void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal); - - void updateLiveStateFromServer(); - - void updateLiveStateFromLocal(); - - void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots); - - void updateExpectedSize(); - - - /** - * Check the size of the block chain to make sure there are enough slots sent back by the server. - * This is only called when we have a gap between the slots that we have locally and the slots - * sent by the server therefore in the slots sent by the server there will be at least 1 Table - * status message - */ - void checkNumSlots(int numberOfSlots); - - void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; } - - - /** + void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal); + + void updateLiveStateFromServer(); + + void updateLiveStateFromLocal(); + + void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots); + + void updateExpectedSize(); + + + /** + * Check the size of the block chain to make sure there are enough slots sent back by the server. + * This is only called when we have a gap between the slots that we have locally and the slots + * sent by the server therefore in the slots sent by the server there will be at least 1 Table + * status message + */ + void checkNumSlots(int numberOfSlots); + + void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; } + + + /** * Update the size of of the local buffer if it is needed. */ - void commitNewMaxSize(); - - /** - * Process the new transaction parts from this latest round of slots received from the server - */ - void processNewTransactionParts(); - - int64_t lastSeqNumArbOn = 0; - - void arbitrateFromServer(); - - Pair arbitrateOnLocalTransaction(Transaction transaction); - - /** - * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates - */ - bool compactArbitrationData(); - - /** - * Update all the commits and the committed tables, sets dead the dead transactions - */ - bool updateCommittedTable(); - - /** - * Create the speculative table from transactions that are still live and have come from the cloud - */ - bool updateSpeculativeTable(bool didProcessNewCommits); - - /** - * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer - */ - void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate); - - /** - * Set dead and remove from the live transaction tables the transactions that are dead - */ - void updateLiveTransactionsAndStatus(); - - /** - * Process this slot, entry by entry. Also update the latest message sent by slot - */ - void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet); - - /** - * Update the last message that was sent for a machine Id - */ - void processEntry(LastMessage entry, HashSet machineSet); - - /** - * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) - */ - void processEntry(NewKey entry); - - /** - * Process new table status entries and set dead the old ones as new ones come in. - * keeps track of the largest and smallest table status seen in this current round - * of updating the local copy of the block chain - */ - void processEntry(TableStatus entry, int64_t seq); - - /** - * Check old messages to see if there is a block chain violation. Also - */ - void processEntry(RejectedMessage entry, SlotIndexer indexer); - - /** - * Check if this abort is live, if not then save it so we can kill it later. - * update the last transaction number that was arbitrated on. - */ - void processEntry(Abort entry); - - /** - * Set dead the transaction part if that transaction is dead and keep track of all new parts - */ - void processEntry(TransactionPart entry); - - /** - * Process new commit entries and save them for future use. Delete duplicates - */ - void processEntry(CommitPart entry); - - /** - * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. - * Updates the live aborts, removes those that are dead and sets them dead. - * Check that the last message seen is correct and that there is no mismatch of our own last message or that - * other clients have not had a rollback on the last message. - */ - void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet); - - /** - * Add a rejected message entry to the watch set to keep track of which clients have seen that - * rejected message entry and which have not. - */ - void addWatchVector(int64_t machineId, RejectedMessage entry); - - /** - * Check if the HMAC chain is not violated - */ - void checkHMACChain(SlotIndexer indexer, Slot[] newSlots); - bool lastInsertedNewKey = false; - - public: - Table(String baseurl, String password, int64_t _localMachineId, int listeningPort); - Table(CloudComm _cloud, int64_t _localMachineId); - - /** - * Initialize the table by inserting a table status as the first entry into the table status - * also initialize the crypto stuff. - */ - void initTable(); - - /** - * Rebuild the table from scratch by pulling the latest block chain from the server. - */ - void rebuild(); - void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber); - uint64_t getArbitrator(IoTString * key); - void close(); - IoTString * getCommitted(IoTString * key); - IoTString * getSpeculative(IoTString * key); - IoTString * getCommittedAtomic(IoTString * key); - bool createNewKey(IoTString * keyName, int64_t machineId); - TransactionStatus * commitTransaction(); - + void commitNewMaxSize(); + + /** + * Process the new transaction parts from this latest round of slots received from the server + */ + void processNewTransactionParts(); + + int64_t lastSeqNumArbOn = 0; + + void arbitrateFromServer(); + + Pair arbitrateOnLocalTransaction(Transaction transaction); + + /** + * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + */ + bool compactArbitrationData(); + + /** + * Update all the commits and the committed tables, sets dead the dead transactions + */ + bool updateCommittedTable(); + + /** + * Create the speculative table from transactions that are still live and have come from the cloud + */ + bool updateSpeculativeTable(bool didProcessNewCommits); + + /** + * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer + */ + void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate); + + /** + * Set dead and remove from the live transaction tables the transactions that are dead + */ + void updateLiveTransactionsAndStatus(); + + /** + * Process this slot, entry by entry. Also update the latest message sent by slot + */ + void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet); + + /** + * Update the last message that was sent for a machine Id + */ + void processEntry(LastMessage entry, HashSet machineSet); + + /** + * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) + */ + void processEntry(NewKey entry); + + /** + * Process new table status entries and set dead the old ones as new ones come in. + * keeps track of the largest and smallest table status seen in this current round + * of updating the local copy of the block chain + */ + void processEntry(TableStatus entry, int64_t seq); + + /** + * Check old messages to see if there is a block chain violation. Also + */ + void processEntry(RejectedMessage entry, SlotIndexer indexer); + + /** + * Check if this abort is live, if not then save it so we can kill it later. + * update the last transaction number that was arbitrated on. + */ + void processEntry(Abort entry); + + /** + * Set dead the transaction part if that transaction is dead and keep track of all new parts + */ + void processEntry(TransactionPart entry); + + /** + * Process new commit entries and save them for future use. Delete duplicates + */ + void processEntry(CommitPart entry); + + /** + * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. + * Updates the live aborts, removes those that are dead and sets them dead. + * Check that the last message seen is correct and that there is no mismatch of our own last message or that + * other clients have not had a rollback on the last message. + */ + void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet); + + /** + * Add a rejected message entry to the watch set to keep track of which clients have seen that + * rejected message entry and which have not. + */ + void addWatchVector(int64_t machineId, RejectedMessage entry); + + /** + * Check if the HMAC chain is not violated + */ + void checkHMACChain(SlotIndexer indexer, Slot[] newSlots); + bool lastInsertedNewKey = false; + +public: + Table(String baseurl, String password, int64_t _localMachineId, int listeningPort); + Table(CloudComm _cloud, int64_t _localMachineId); + + /** + * Initialize the table by inserting a table status as the first entry into the table status + * also initialize the crypto stuff. + */ + void initTable(); + + /** + * Rebuild the table from scratch by pulling the latest block chain from the server. + */ + void rebuild(); + void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber); + uint64_t getArbitrator(IoTString *key); + void close(); + IoTString *getCommitted(IoTString *key); + IoTString *getSpeculative(IoTString *key); + IoTString *getCommittedAtomic(IoTString *key); + bool createNewKey(IoTString *keyName, int64_t machineId); + TransactionStatus *commitTransaction(); + /** * Get the machine ID for this client */ - int64_t getMachineId() { return localMachineId; } - - /** - * Decrement the number of live slots that we currently have - */ - void decrementLiveCount() { liveSlotCount--; } - int64_t getLocalSequenceNumber(); - Array * acceptDataFromLocal(Array * data); + int64_t getMachineId() { return localMachineId; } + + /** + * Decrement the number of live slots that we currently have + */ + void decrementLiveCount() { liveSlotCount--; } + int64_t getLocalSequenceNumber(); + Array *acceptDataFromLocal(Array *data); }; #endif diff --git a/version2/src/C/TableStatus.cc b/version2/src/C/TableStatus.cc index b875350..f61a6ae 100644 --- a/version2/src/C/TableStatus.cc +++ b/version2/src/C/TableStatus.cc @@ -1,12 +1,12 @@ #include "TableStatus.h" #include "ByteBuffer.h" -Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb) { - int maxslots=bb->getInt(); +Entry *TableStatus_decode(Slot *slot, ByteBuffer *bb) { + int maxslots = bb->getInt(); return new TableStatus(slot, maxslots); } -void TableStatus::encode(ByteBuffer * bb) { +void TableStatus::encode(ByteBuffer *bb) { bb->put(TypeTableStatus); bb->putInt(maxslots); } diff --git a/version2/src/C/TableStatus.h b/version2/src/C/TableStatus.h index ce2576e..e7978c0 100644 --- a/version2/src/C/TableStatus.h +++ b/version2/src/C/TableStatus.h @@ -11,21 +11,21 @@ */ class TableStatus : public Entry { - private: +private: int maxslots; - public: - TableStatus(Slot * slot, int _maxslots) : Entry(slot), +public: + TableStatus(Slot *slot, int _maxslots) : Entry(slot), maxslots(_maxslots) { - } + } int getMaxSlots() { return maxslots; } void encode(ByteBuffer *bb); - int getSize() { return sizeof(int32_t)+sizeof(char); } - + int getSize() { return sizeof(int32_t) + sizeof(char); } + char getType() { return TypeTableStatus; } - - Entry * getCopy(Slot * s) { return new TableStatus(s, maxslots); } + + Entry *getCopy(Slot *s) { return new TableStatus(s, maxslots); } }; -Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb); +Entry *TableStatus_decode(Slot *slot, ByteBuffer *bb); #endif diff --git a/version2/src/C/ThreeTuple.h b/version2/src/C/ThreeTuple.h index ef5bcb5..5ab9794 100644 --- a/version2/src/C/ThreeTuple.h +++ b/version2/src/C/ThreeTuple.h @@ -2,18 +2,18 @@ #define THREETUPLE_H template - class ThreeTuple { - private: +class ThreeTuple { +private: A a; B b; C c; - public: - ThreeTuple(A _a, B _b, C _c) : - a(_a), +public: + ThreeTuple(A _a, B _b, C _c) : + a(_a), b(_b), c(_c) { - } + } A getFirst() { return a; } diff --git a/version2/src/C/TimingSingleton.h b/version2/src/C/TimingSingleton.h index 4983f81..25f45e5 100644 --- a/version2/src/C/TimingSingleton.h +++ b/version2/src/C/TimingSingleton.h @@ -3,12 +3,12 @@ #include class TimingSingleton { - private: +private: static TimingSingleton singleton = new TimingSingleton( ); int64_t startTime = 0; int64_t totalTime = 0; - - TimingSingleton() : startTime(0), + + TimingSingleton() : startTime(0), totalTime(0) { } @@ -16,14 +16,14 @@ class TimingSingleton { int64_t time; struct timeval tv; gettimeofday(&tv, NULL); - return tv.tv_sec*1000000000+tv.tv_usec*1000; + return tv.tv_sec * 1000000000 + tv.tv_usec * 1000; } - - public: + +public: void startTime() { startTime = nanoTime(); } - + void endTime() { totalTime += nanoTime() - startTime; } @@ -34,7 +34,7 @@ class TimingSingleton { }; TimingSingleton t_singleton; -TimingSingleton * TimingSingleton_getInstance() { +TimingSingleton *TimingSingleton_getInstance() { return &t_singleton; } #endif diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index d5abcc1..0cee839 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -2,299 +2,299 @@ class Transaction { - Hashtable parts = NULL; - Set missingParts = NULL; - Vector partsPendingSend = NULL; - bool isComplete = false; - bool hasLastPart = false; - Set keyValueGuardSet = NULL; - Set keyValueUpdateSet = NULL; - bool isDead = false; - int64_t sequenceNumber = -1; - int64_t clientLocalSequenceNumber = -1; - int64_t arbitratorId = -1; - int64_t machineId = -1; - Pair transactionId = NULL; - - int nextPartToSend = 0; - bool didSendAPartToServer = false; - - TransactionStatus transactionStatus = NULL; - - bool hadServerFailure = false; - - Transaction() { - parts = new Hashtable(); - keyValueGuardSet = new HashSet(); - keyValueUpdateSet = new HashSet(); - partsPendingSend = new Vector(); - } - - void addPartEncode(TransactionPart newPart) { - parts.put(newPart.getPartNumber(), newPart); - partsPendingSend.add(newPart.getPartNumber()); - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); - - isComplete = true; - } - - void addPartDecode(TransactionPart newPart) { - - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; - } - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); - - TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } - } - - if (!isComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - - // Check if all the parts have been seen - if (missingParts.size() == 0) { - - // We have all the parts - isComplete = true; - - // Decode all the parts and create the key value guard and update sets - decodeTransactionData(); - } - } - } - - void addUpdateKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - } - - void addGuardKV(KeyValue kv) { - keyValueGuardSet.add(kv); - } - - - int64_t getSequenceNumber() { - return sequenceNumber; - } - - void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; - - for (int32_t i : parts.keySet()) { - parts.get(i).setSequenceNumber(sequenceNumber); - } - } - - int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - - Hashtable getParts() { - return parts; - } + Hashtable parts = NULL; + Set missingParts = NULL; + Vector partsPendingSend = NULL; + bool isComplete = false; + bool hasLastPart = false; + Set keyValueGuardSet = NULL; + Set keyValueUpdateSet = NULL; + bool isDead = false; + int64_t sequenceNumber = -1; + int64_t clientLocalSequenceNumber = -1; + int64_t arbitratorId = -1; + int64_t machineId = -1; + Pair transactionId = NULL; + + int nextPartToSend = 0; + bool didSendAPartToServer = false; + + TransactionStatus transactionStatus = NULL; + + bool hadServerFailure = false; + + Transaction() { + parts = new Hashtable(); + keyValueGuardSet = new HashSet(); + keyValueUpdateSet = new HashSet(); + partsPendingSend = new Vector(); + } + + void addPartEncode(TransactionPart newPart) { + parts.put(newPart.getPartNumber(), newPart); + partsPendingSend.add(newPart.getPartNumber()); + + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); + + isComplete = true; + } + + void addPartDecode(TransactionPart newPart) { + + if (isDead) { + // If dead then just kill this part and move on + newPart.setDead(); + return; + } + + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); + + TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); + + if (previoslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previoslySeenPart.setDead(); + } else if (newPart.isLastPart()) { + missingParts = new HashSet(); + hasLastPart = true; + + for (int i = 0; i < newPart.getPartNumber(); i++) { + if (parts.get(i) == NULL) { + missingParts.add(i); + } + } + } + + if (!isComplete && hasLastPart) { + + // We have seen this part so remove it from the set of missing parts + missingParts.remove(newPart.getPartNumber()); + + // Check if all the parts have been seen + if (missingParts.size() == 0) { + + // We have all the parts + isComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeTransactionData(); + } + } + } + + void addUpdateKV(KeyValue kv) { + keyValueUpdateSet.add(kv); + } + + void addGuardKV(KeyValue kv) { + keyValueGuardSet.add(kv); + } + + + int64_t getSequenceNumber() { + return sequenceNumber; + } + + void setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; + + for (int32_t i : parts.keySet()) { + parts.get(i).setSequenceNumber(sequenceNumber); + } + } + + int64_t getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; + } + + Hashtable getParts() { + return parts; + } - bool didSendAPartToServer() { - return didSendAPartToServer; - } - - void resetNextPartToSend() { - nextPartToSend = 0; - } - - TransactionPart getNextPartToSend() { - if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { - return NULL; - } - TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); - nextPartToSend++; - return part; - } - - - void setServerFailure() { - hadServerFailure = true; - } - - bool getServerFailure() { - return hadServerFailure; - } - - - void resetServerFailure() { - hadServerFailure = false; - } - - - void setTransactionStatus(TransactionStatus _transactionStatus) { - transactionStatus = _transactionStatus; - } - - TransactionStatus getTransactionStatus() { - return transactionStatus; - } - - void removeSentParts(Vector sentParts) { - nextPartToSend = 0; - if(partsPendingSend.removeAll(sentParts)) - { - didSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); - } - } - - bool didSendAllParts() { - return partsPendingSend.isEmpty(); - } - - Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } - - int getNumberOfParts() { - return parts.size(); - } - - int64_t getMachineId() { - return machineId; - } - - int64_t getArbitrator() { - return arbitratorId; - } - - bool isComplete() { - return isComplete; - } - - Pair getId() { - return transactionId; - } - - void setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber : parts.keySet()) { - TransactionPart part = parts.get(partNumber); - part.setDead(); - } - } - - TransactionPart getPart(int index) { - return parts.get(index); - } - - void decodeTransactionData() { - - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } - - char[] combinedData = new char[dataSize]; - int currentPosition = 0; - - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } - - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVGuards = bbDecode.getInt(); - int numberOfKVUpdates = bbDecode.getInt(); - - // Decode all the guard key values - for (int i = 0; i < numberOfKVGuards; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueGuardSet.add(kv); - } - - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - } - } - - bool evaluateGuard(Hashtable committedKeyValueTable, Hashtable speculatedKeyValueTable, Hashtable pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue kvGuard : keyValueGuardSet) { - - // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = NULL; - - // If we have a speculation table then use it first - if (pendingTransactionSpeculatedKeyValueTable != NULL) { - kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); - } - - // If we have a speculation table then use it first - if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { - kv = speculatedKeyValueTable.get(kvGuard.getKey()); - } - - if (kv == NULL) { - // if it is not in the speculative table then check the committed table and use that - // value as our latest assumption - kv = committedKeyValueTable.get(kvGuard.getKey()); - } - - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { - - - if (kv != NULL) { - System.out.println(kvGuard.getValue() + " " + kv.getValue()); - } else { - System.out.println(kvGuard.getValue() + " " + kv); - } - - return false; - } - } else { - if (kv != NULL) { - return false; - } - } - } - return true; - } + bool didSendAPartToServer() { + return didSendAPartToServer; + } + + void resetNextPartToSend() { + nextPartToSend = 0; + } + + TransactionPart getNextPartToSend() { + if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { + return NULL; + } + TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); + nextPartToSend++; + return part; + } + + + void setServerFailure() { + hadServerFailure = true; + } + + bool getServerFailure() { + return hadServerFailure; + } + + + void resetServerFailure() { + hadServerFailure = false; + } + + + void setTransactionStatus(TransactionStatus _transactionStatus) { + transactionStatus = _transactionStatus; + } + + TransactionStatus getTransactionStatus() { + return transactionStatus; + } + + void removeSentParts(Vector sentParts) { + nextPartToSend = 0; + if (partsPendingSend.removeAll(sentParts)) + { + didSendAPartToServer = true; + transactionStatus.setTransactionSequenceNumber(sequenceNumber); + } + } + + bool didSendAllParts() { + return partsPendingSend.isEmpty(); + } + + Set getKeyValueUpdateSet() { + return keyValueUpdateSet; + } + + int getNumberOfParts() { + return parts.size(); + } + + int64_t getMachineId() { + return machineId; + } + + int64_t getArbitrator() { + return arbitratorId; + } + + bool isComplete() { + return isComplete; + } + + Pair getId() { + return transactionId; + } + + void setDead() { + if (isDead) { + // Already dead + return; + } + + // Set dead + isDead = true; + + // Make all the parts of this transaction dead + for (int32_t partNumber : parts.keySet()) { + TransactionPart part = parts.get(partNumber); + part.setDead(); + } + } + + TransactionPart getPart(int index) { + return parts.get(index); + } + + void decodeTransactionData() { + + // Calculate the size of the data section + int dataSize = 0; + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + dataSize += tp.getDataSize(); + } + + char[] combinedData = new char[dataSize]; + int currentPosition = 0; + + // Stitch all the data sections together + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); + currentPosition += tp.getDataSize(); + } + + // Decoder Object + ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); + + // Decode how many key value pairs need to be decoded + int numberOfKVGuards = bbDecode.getInt(); + int numberOfKVUpdates = bbDecode.getInt(); + + // Decode all the guard key values + for (int i = 0; i < numberOfKVGuards; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueGuardSet.add(kv); + } + + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueUpdateSet.add(kv); + } + } + + bool evaluateGuard(Hashtable committedKeyValueTable, Hashtable speculatedKeyValueTable, Hashtable pendingTransactionSpeculatedKeyValueTable) { + for (KeyValue kvGuard : keyValueGuardSet) { + + // First check if the key is in the speculative table, this is the value of the latest assumption + KeyValue kv = NULL; + + // If we have a speculation table then use it first + if (pendingTransactionSpeculatedKeyValueTable != NULL) { + kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); + } + + // If we have a speculation table then use it first + if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { + kv = speculatedKeyValueTable.get(kvGuard.getKey()); + } + + if (kv == NULL) { + // if it is not in the speculative table then check the committed table and use that + // value as our latest assumption + kv = committedKeyValueTable.get(kvGuard.getKey()); + } + + if (kvGuard.getValue() != NULL) { + if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { + + + if (kv != NULL) { + System.out.println(kvGuard.getValue() + " " + kv.getValue()); + } else { + System.out.println(kvGuard.getValue() + " " + kv); + } + + return false; + } + } else { + if (kv != NULL) { + return false; + } + } + } + return true; + } } diff --git a/version2/src/C/Transaction.h b/version2/src/C/Transaction.h index dab4949..04fbb9a 100644 --- a/version2/src/C/Transaction.h +++ b/version2/src/C/Transaction.h @@ -3,301 +3,301 @@ #include "common.h" class Transaction { - private: +private: Hashtable parts = NULL; Set missingParts = NULL; Vector partsPendingSend = NULL; - bool isComplete = false; - bool hasLastPart = false; - Hashset * keyValueGuardSet = NULL; - Hashset * keyValueUpdateSet = NULL; + bool isComplete = false; + bool hasLastPart = false; + Hashset *keyValueGuardSet = NULL; + Hashset *keyValueUpdateSet = NULL; bool isDead = false; - int64_t sequenceNumber = -1; + int64_t sequenceNumber = -1; int64_t clientLocalSequenceNumber = -1; int64_t arbitratorId = -1; int64_t machineId = -1; Pair transactionId = NULL; - - int nextPartToSend = 0; + + int nextPartToSend = 0; bool didSendAPartToServer = false; - - TransactionStatus transactionStatus = NULL; - bool hadServerFailure = false; + TransactionStatus transactionStatus = NULL; + + bool hadServerFailure = false; + + public Transaction() { + parts = new Hashtable(); + keyValueGuardSet = new HashSet(); + keyValueUpdateSet = new HashSet(); + partsPendingSend = new Vector(); + } + + public void addPartEncode(TransactionPart newPart) { + parts.put(newPart.getPartNumber(), newPart); + partsPendingSend.add(newPart.getPartNumber()); + + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); + + isComplete = true; + } + + public void addPartDecode(TransactionPart newPart) { + + if (isDead) { + // If dead then just kill this part and move on + newPart.setDead(); + return; + } + + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); + + TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); + + if (previoslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previoslySeenPart.setDead(); + } else if (newPart.isLastPart()) { + missingParts = new HashSet(); + hasLastPart = true; + + for (int i = 0; i < newPart.getPartNumber(); i++) { + if (parts.get(i) == NULL) { + missingParts.add(i); + } + } + } + + if (!isComplete && hasLastPart) { + + // We have seen this part so remove it from the set of missing parts + missingParts.remove(newPart.getPartNumber()); + + // Check if all the parts have been seen + if (missingParts.size() == 0) { + + // We have all the parts + isComplete = true; - public Transaction() { - parts = new Hashtable(); - keyValueGuardSet = new HashSet(); - keyValueUpdateSet = new HashSet(); - partsPendingSend = new Vector(); - } - - public void addPartEncode(TransactionPart newPart) { - parts.put(newPart.getPartNumber(), newPart); - partsPendingSend.add(newPart.getPartNumber()); - - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); - - isComplete = true; - } - - public void addPartDecode(TransactionPart newPart) { + // Decode all the parts and create the key value guard and update sets + decodeTransactionData(); + } + } + } - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; - } + public void addUpdateKV(KeyValue kv) { + keyValueUpdateSet.add(kv); + } - sequenceNumber = newPart.getSequenceNumber(); - arbitratorId = newPart.getArbitratorId(); - transactionId = newPart.getTransactionId(); - clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); - machineId = newPart.getMachineId(); + public void addGuardKV(KeyValue kv) { + keyValueGuardSet.add(kv); + } - TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } - } + public int64_t getSequenceNumber() { + return sequenceNumber; + } - if (!isComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - - // Check if all the parts have been seen - if (missingParts.size() == 0) { + public void setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; - // We have all the parts - isComplete = true; + for (int32_t i : parts.keySet()) { + parts.get(i).setSequenceNumber(sequenceNumber); + } + } + + public int64_t getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; + } + + public Hashtable getParts() { + return parts; + } - // Decode all the parts and create the key value guard and update sets - decodeTransactionData(); - } - } - } - - public void addUpdateKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - } - - public void addGuardKV(KeyValue kv) { - keyValueGuardSet.add(kv); - } - - - public int64_t getSequenceNumber() { - return sequenceNumber; - } - - public void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; - - for (int32_t i : parts.keySet()) { - parts.get(i).setSequenceNumber(sequenceNumber); - } - } - - public int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - - public Hashtable getParts() { - return parts; - } - - public bool didSendAPartToServer() { - return didSendAPartToServer; - } - - public void resetNextPartToSend() { - nextPartToSend = 0; - } - - public TransactionPart getNextPartToSend() { - if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { - return NULL; - } - TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); - nextPartToSend++; - return part; - } - - - public void setServerFailure() { - hadServerFailure = true; - } - - public bool getServerFailure() { - return hadServerFailure; - } - - - public void resetServerFailure() { - hadServerFailure = false; - } - - - public void setTransactionStatus(TransactionStatus _transactionStatus) { - transactionStatus = _transactionStatus; - } - - public TransactionStatus getTransactionStatus() { - return transactionStatus; - } - - public void removeSentParts(Vector sentParts) { - nextPartToSend = 0; - if(partsPendingSend.removeAll(sentParts)) - { - didSendAPartToServer = true; - transactionStatus.setTransactionSequenceNumber(sequenceNumber); - } - } - - public bool didSendAllParts() { - return partsPendingSend.isEmpty(); - } - - public Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } - - public int getNumberOfParts() { - return parts.size(); - } - - public int64_t getMachineId() { - return machineId; - } - - public int64_t getArbitrator() { - return arbitratorId; - } - - public bool isComplete() { - return isComplete; - } - - public Pair getId() { - return transactionId; - } - - public void setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber : parts.keySet()) { - TransactionPart part = parts.get(partNumber); - part.setDead(); - } - } - - public TransactionPart getPart(int index) { - return parts.get(index); - } - - private void decodeTransactionData() { - - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } - - char[] combinedData = new char[dataSize]; - int currentPosition = 0; - - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - TransactionPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } - - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVGuards = bbDecode.getInt(); - int numberOfKVUpdates = bbDecode.getInt(); - - // Decode all the guard key values - for (int i = 0; i < numberOfKVGuards; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueGuardSet.add(kv); - } - - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - } - } - - public bool evaluateGuard(Hashtable committedKeyValueTable, Hashtable speculatedKeyValueTable, Hashtable pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue kvGuard : keyValueGuardSet) { - - // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = NULL; - - // If we have a speculation table then use it first - if (pendingTransactionSpeculatedKeyValueTable != NULL) { - kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); - } - - // If we have a speculation table then use it first - if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { - kv = speculatedKeyValueTable.get(kvGuard.getKey()); - } - - if (kv == NULL) { - // if it is not in the speculative table then check the committed table and use that - // value as our latest assumption - kv = committedKeyValueTable.get(kvGuard.getKey()); - } - - if (kvGuard.getValue() != NULL) { - if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { - - - if (kv != NULL) { - System.out.println(kvGuard.getValue() + " " + kv.getValue()); - } else { - System.out.println(kvGuard.getValue() + " " + kv); - } - - return false; - } - } else { - if (kv != NULL) { - return false; - } - } - } - return true; - } + public bool didSendAPartToServer() { + return didSendAPartToServer; + } + + public void resetNextPartToSend() { + nextPartToSend = 0; + } + + public TransactionPart getNextPartToSend() { + if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { + return NULL; + } + TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); + nextPartToSend++; + return part; + } + + + public void setServerFailure() { + hadServerFailure = true; + } + + public bool getServerFailure() { + return hadServerFailure; + } + + + public void resetServerFailure() { + hadServerFailure = false; + } + + + public void setTransactionStatus(TransactionStatus _transactionStatus) { + transactionStatus = _transactionStatus; + } + + public TransactionStatus getTransactionStatus() { + return transactionStatus; + } + + public void removeSentParts(Vector sentParts) { + nextPartToSend = 0; + if (partsPendingSend.removeAll(sentParts)) + { + didSendAPartToServer = true; + transactionStatus.setTransactionSequenceNumber(sequenceNumber); + } + } + + public bool didSendAllParts() { + return partsPendingSend.isEmpty(); + } + + public Set getKeyValueUpdateSet() { + return keyValueUpdateSet; + } + + public int getNumberOfParts() { + return parts.size(); + } + + public int64_t getMachineId() { + return machineId; + } + + public int64_t getArbitrator() { + return arbitratorId; + } + + public bool isComplete() { + return isComplete; + } + + public Pair getId() { + return transactionId; + } + + public void setDead() { + if (isDead) { + // Already dead + return; + } + + // Set dead + isDead = true; + + // Make all the parts of this transaction dead + for (int32_t partNumber : parts.keySet()) { + TransactionPart part = parts.get(partNumber); + part.setDead(); + } + } + + public TransactionPart getPart(int index) { + return parts.get(index); + } + + private void decodeTransactionData() { + + // Calculate the size of the data section + int dataSize = 0; + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + dataSize += tp.getDataSize(); + } + + char[] combinedData = new char[dataSize]; + int currentPosition = 0; + + // Stitch all the data sections together + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); + currentPosition += tp.getDataSize(); + } + + // Decoder Object + ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); + + // Decode how many key value pairs need to be decoded + int numberOfKVGuards = bbDecode.getInt(); + int numberOfKVUpdates = bbDecode.getInt(); + + // Decode all the guard key values + for (int i = 0; i < numberOfKVGuards; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueGuardSet.add(kv); + } + + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueUpdateSet.add(kv); + } + } + + public bool evaluateGuard(Hashtable committedKeyValueTable, Hashtable speculatedKeyValueTable, Hashtable pendingTransactionSpeculatedKeyValueTable) { + for (KeyValue kvGuard : keyValueGuardSet) { + + // First check if the key is in the speculative table, this is the value of the latest assumption + KeyValue kv = NULL; + + // If we have a speculation table then use it first + if (pendingTransactionSpeculatedKeyValueTable != NULL) { + kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); + } + + // If we have a speculation table then use it first + if ((kv == NULL) && (speculatedKeyValueTable != NULL)) { + kv = speculatedKeyValueTable.get(kvGuard.getKey()); + } + + if (kv == NULL) { + // if it is not in the speculative table then check the committed table and use that + // value as our latest assumption + kv = committedKeyValueTable.get(kvGuard.getKey()); + } + + if (kvGuard.getValue() != NULL) { + if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) { + + + if (kv != NULL) { + System.out.println(kvGuard.getValue() + " " + kv.getValue()); + } else { + System.out.println(kvGuard.getValue() + " " + kv); + } + + return false; + } + } else { + if (kv != NULL) { + return false; + } + } + } + return true; + } }; #endif diff --git a/version2/src/C/TransactionPart.cc b/version2/src/C/TransactionPart.cc index 6e0528f..1acda0b 100644 --- a/version2/src/C/TransactionPart.cc +++ b/version2/src/C/TransactionPart.cc @@ -2,136 +2,136 @@ class TransactionPart extends Entry { - // Max size of the part excluding the fixed size header - static final int MAX_NON_HEADER_SIZE = 512; - - int64_t sequenceNumber = -1; - int64_t machineId = -1; - int64_t arbitratorId = -1; - int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of - int partNumber = -1; // Parts position in the - Boolean isLastPart = false; - - Pair transactionId = NULL; - Pair partId = NULL; - - char[] data = NULL; - - TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) { - super(s); - machineId = _machineId; - arbitratorId = _arbitratorId; - clientLocalSequenceNumber = _clientLocalSequenceNumber; - partNumber = _partNumber; - data = _data; - isLastPart = _isLastPart; - - transactionId = new Pair(machineId, clientLocalSequenceNumber); - partId = new Pair(clientLocalSequenceNumber, partNumber); - - } - - int getSize() { - if (data == NULL) { - return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); - } - return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; - } - - void setSlot(Slot s) { - parentslot = s; - } - - Pair getTransactionId() { - return transactionId; - } - - int64_t getArbitratorId() { - return arbitratorId; - } - - Pair getPartId() { - return partId; - } - - int getPartNumber() { - return partNumber; - } - - int getDataSize() { - return data.length; - } - - char[] getData() { - return data; - } - - Boolean isLastPart() { - return isLastPart; - } - - int64_t getMachineId() { - return machineId; - } - - int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - - - int64_t getSequenceNumber() { - return sequenceNumber; - } - - void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; - } - - static Entry decode(Slot s, ByteBuffer bb) { - int64_t sequenceNumber = bb->getLong(); - int64_t machineId = bb->getLong(); - int64_t arbitratorId = bb->getLong(); - int64_t clientLocalSequenceNumber = bb->getLong(); - int partNumber = bb->getInt(); - int dataSize = bb->getInt(); - Boolean isLastPart = (bb->get() == 1); - // Get the data - char[] data = new char[dataSize]; - bb->get(data); - - TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); - returnTransactionPart.setSequenceNumber(sequenceNumber); - - return returnTransactionPart; - } - - void encode(ByteBuffer bb) { - bb->put(Entry.TypeTransactionPart); - bb->putLong(sequenceNumber); - bb->putLong(machineId); - bb->putLong(arbitratorId); - bb->putLong(clientLocalSequenceNumber); - bb->putInt(partNumber); - bb->putInt(data.length); - - if (isLastPart) { - bb->put((char)1); - } else { - bb->put((char)0); - } - - bb->put(data); - } - - char getType() { - return Entry.TypeTransactionPart; - } - - Entry getCopy(Slot s) { - - TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); - copyTransaction.setSequenceNumber(sequenceNumber); - - return copyTransaction; - } + // Max size of the part excluding the fixed size header + static final int MAX_NON_HEADER_SIZE = 512; + + int64_t sequenceNumber = -1; + int64_t machineId = -1; + int64_t arbitratorId = -1; + int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of + int partNumber = -1; // Parts position in the + bool isLastPart = false; + + Pair transactionId = NULL; + Pair partId = NULL; + + char[] data = NULL; + + TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) { + super(s); + machineId = _machineId; + arbitratorId = _arbitratorId; + clientLocalSequenceNumber = _clientLocalSequenceNumber; + partNumber = _partNumber; + data = _data; + isLastPart = _isLastPart; + + transactionId = new Pair(machineId, clientLocalSequenceNumber); + partId = new Pair(clientLocalSequenceNumber, partNumber); + + } + + int getSize() { + if (data == NULL) { + return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); + } + return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; + } + + void setSlot(Slot s) { + parentslot = s; + } + + Pair getTransactionId() { + return transactionId; + } + + int64_t getArbitratorId() { + return arbitratorId; + } + + Pair getPartId() { + return partId; + } + + int getPartNumber() { + return partNumber; + } + + int getDataSize() { + return data.length; + } + + char[] getData() { + return data; + } + + bool isLastPart() { + return isLastPart; + } + + int64_t getMachineId() { + return machineId; + } + + int64_t getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; + } + + + int64_t getSequenceNumber() { + return sequenceNumber; + } + + void setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; + } + + static Entry decode(Slot s, ByteBuffer bb) { + int64_t sequenceNumber = bb->getLong(); + int64_t machineId = bb->getLong(); + int64_t arbitratorId = bb->getLong(); + int64_t clientLocalSequenceNumber = bb->getLong(); + int partNumber = bb->getInt(); + int dataSize = bb->getInt(); + bool isLastPart = (bb->get() == 1); + // Get the data + char[] data = new char[dataSize]; + bb->get(data); + + TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); + returnTransactionPart.setSequenceNumber(sequenceNumber); + + return returnTransactionPart; + } + + void encode(ByteBuffer bb) { + bb->put(Entry.TypeTransactionPart); + bb->putLong(sequenceNumber); + bb->putLong(machineId); + bb->putLong(arbitratorId); + bb->putLong(clientLocalSequenceNumber); + bb->putInt(partNumber); + bb->putInt(data.length); + + if (isLastPart) { + bb->put((char)1); + } else { + bb->put((char)0); + } + + bb->put(data); + } + + char getType() { + return Entry.TypeTransactionPart; + } + + Entry getCopy(Slot s) { + + TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); + copyTransaction.setSequenceNumber(sequenceNumber); + + return copyTransaction; + } } diff --git a/version2/src/C/TransactionPart.h b/version2/src/C/TransactionPart.h index 4b35977..b89b57c 100644 --- a/version2/src/C/TransactionPart.h +++ b/version2/src/C/TransactionPart.h @@ -2,136 +2,136 @@ class TransactionPart extends Entry { - // Max size of the part excluding the fixed size header - public static final int MAX_NON_HEADER_SIZE = 512; - - private int64_t sequenceNumber = -1; - private int64_t machineId = -1; - private int64_t arbitratorId = -1; - private int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of - private int partNumber = -1; // Parts position in the - private Boolean isLastPart = false; - - private Pair transactionId = NULL; - private Pair partId = NULL; - - private char[] data = NULL; - - public TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) { - super(s); - machineId = _machineId; - arbitratorId = _arbitratorId; - clientLocalSequenceNumber = _clientLocalSequenceNumber; - partNumber = _partNumber; - data = _data; - isLastPart = _isLastPart; - - transactionId = new Pair(machineId, clientLocalSequenceNumber); - partId = new Pair(clientLocalSequenceNumber, partNumber); - - } - - public int getSize() { - if (data == NULL) { - return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); - } - return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; - } - - public void setSlot(Slot s) { - parentslot = s; - } - - public Pair getTransactionId() { - return transactionId; - } - - public int64_t getArbitratorId() { - return arbitratorId; - } - - public Pair getPartId() { - return partId; - } - - public int getPartNumber() { - return partNumber; - } - - public int getDataSize() { - return data.length; - } - - public char[] getData() { - return data; - } - - public Boolean isLastPart() { - return isLastPart; - } - - public int64_t getMachineId() { - return machineId; - } - - public int64_t getClientLocalSequenceNumber() { - return clientLocalSequenceNumber; - } - - - public int64_t getSequenceNumber() { - return sequenceNumber; - } - - public void setSequenceNumber(int64_t _sequenceNumber) { - sequenceNumber = _sequenceNumber; - } - - static Entry decode(Slot s, ByteBuffer bb) { - int64_t sequenceNumber = bb->getLong(); - int64_t machineId = bb->getLong(); - int64_t arbitratorId = bb->getLong(); - int64_t clientLocalSequenceNumber = bb->getLong(); - int partNumber = bb->getInt(); - int dataSize = bb->getInt(); - Boolean isLastPart = (bb->get() == 1); - // Get the data - char[] data = new char[dataSize]; - bb->get(data); - - TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); - returnTransactionPart.setSequenceNumber(sequenceNumber); - - return returnTransactionPart; - } - - public void encode(ByteBuffer bb) { - bb->put(Entry.TypeTransactionPart); - bb->putLong(sequenceNumber); - bb->putLong(machineId); - bb->putLong(arbitratorId); - bb->putLong(clientLocalSequenceNumber); - bb->putInt(partNumber); - bb->putInt(data.length); - - if (isLastPart) { - bb->put((char)1); - } else { - bb->put((char)0); - } - - bb->put(data); - } - - public char getType() { - return Entry.TypeTransactionPart; - } - - public Entry getCopy(Slot s) { - - TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); - copyTransaction.setSequenceNumber(sequenceNumber); - - return copyTransaction; - } + // Max size of the part excluding the fixed size header + public static final int MAX_NON_HEADER_SIZE = 512; + + private int64_t sequenceNumber = -1; + private int64_t machineId = -1; + private int64_t arbitratorId = -1; + private int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of + private int partNumber = -1; // Parts position in the + private bool isLastPart = false; + + private Pair transactionId = NULL; + private Pair partId = NULL; + + private char[] data = NULL; + + public TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) { + super(s); + machineId = _machineId; + arbitratorId = _arbitratorId; + clientLocalSequenceNumber = _clientLocalSequenceNumber; + partNumber = _partNumber; + data = _data; + isLastPart = _isLastPart; + + transactionId = new Pair(machineId, clientLocalSequenceNumber); + partId = new Pair(clientLocalSequenceNumber, partNumber); + + } + + public int getSize() { + if (data == NULL) { + return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)); + } + return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length; + } + + public void setSlot(Slot s) { + parentslot = s; + } + + public Pair getTransactionId() { + return transactionId; + } + + public int64_t getArbitratorId() { + return arbitratorId; + } + + public Pair getPartId() { + return partId; + } + + public int getPartNumber() { + return partNumber; + } + + public int getDataSize() { + return data.length; + } + + public char[] getData() { + return data; + } + + public bool isLastPart() { + return isLastPart; + } + + public int64_t getMachineId() { + return machineId; + } + + public int64_t getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; + } + + + public int64_t getSequenceNumber() { + return sequenceNumber; + } + + public void setSequenceNumber(int64_t _sequenceNumber) { + sequenceNumber = _sequenceNumber; + } + + static Entry decode(Slot s, ByteBuffer bb) { + int64_t sequenceNumber = bb->getLong(); + int64_t machineId = bb->getLong(); + int64_t arbitratorId = bb->getLong(); + int64_t clientLocalSequenceNumber = bb->getLong(); + int partNumber = bb->getInt(); + int dataSize = bb->getInt(); + bool isLastPart = (bb->get() == 1); + // Get the data + char[] data = new char[dataSize]; + bb->get(data); + + TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); + returnTransactionPart.setSequenceNumber(sequenceNumber); + + return returnTransactionPart; + } + + public void encode(ByteBuffer bb) { + bb->put(Entry.TypeTransactionPart); + bb->putLong(sequenceNumber); + bb->putLong(machineId); + bb->putLong(arbitratorId); + bb->putLong(clientLocalSequenceNumber); + bb->putInt(partNumber); + bb->putInt(data.length); + + if (isLastPart) { + bb->put((char)1); + } else { + bb->put((char)0); + } + + bb->put(data); + } + + public char getType() { + return Entry.TypeTransactionPart; + } + + public Entry getCopy(Slot s) { + + TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart); + copyTransaction.setSequenceNumber(sequenceNumber); + + return copyTransaction; + } } diff --git a/version2/src/C/TransactionStatus.cc b/version2/src/C/TransactionStatus.cc index e1824db..1daee97 100644 --- a/version2/src/C/TransactionStatus.cc +++ b/version2/src/C/TransactionStatus.cc @@ -1,50 +1,50 @@ class TransactionStatus { - static final char StatusAborted = 1; - static final char StatusPending = 2; - static final char StatusCommitted = 3; - // static final char StatusRetrying = 4; - static final char StatusSentPartial = 5; - static final char StatusSentFully = 6; - static final char StatusNoEffect = 10; - - char status = 0; - bool applicationReleased = false; - bool wasSentInChain = false; - int64_t transactionSequenceNumber = 0; - int64_t arbitrator = -1; - - - TransactionStatus(char _status, int64_t _arbitrator) { - status = _status; - arbitrator = _arbitrator; - } - - char getStatus() { - return status; - } - - void setStatus(char _status) { - status = _status; - } - - int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } - - void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) { - transactionSequenceNumber = _transactionSequenceNumber; - } - - int64_t getTransactionArbitrator() { - return arbitrator; - } - - void release() { - applicationReleased = true; - } - - bool getReleased() { - return applicationReleased; - } + static final char StatusAborted = 1; + static final char StatusPending = 2; + static final char StatusCommitted = 3; + // static final char StatusRetrying = 4; + static final char StatusSentPartial = 5; + static final char StatusSentFully = 6; + static final char StatusNoEffect = 10; + + char status = 0; + bool applicationReleased = false; + bool wasSentInChain = false; + int64_t transactionSequenceNumber = 0; + int64_t arbitrator = -1; + + + TransactionStatus(char _status, int64_t _arbitrator) { + status = _status; + arbitrator = _arbitrator; + } + + char getStatus() { + return status; + } + + void setStatus(char _status) { + status = _status; + } + + int64_t getTransactionSequenceNumber() { + return transactionSequenceNumber; + } + + void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) { + transactionSequenceNumber = _transactionSequenceNumber; + } + + int64_t getTransactionArbitrator() { + return arbitrator; + } + + void release() { + applicationReleased = true; + } + + bool getReleased() { + return applicationReleased; + } } diff --git a/version2/src/C/TransactionStatus.h b/version2/src/C/TransactionStatus.h index f697dcf..d153743 100644 --- a/version2/src/C/TransactionStatus.h +++ b/version2/src/C/TransactionStatus.h @@ -1,50 +1,50 @@ class TransactionStatus { - static final char StatusAborted = 1; - static final char StatusPending = 2; - static final char StatusCommitted = 3; - // static final char StatusRetrying = 4; - static final char StatusSentPartial = 5; - static final char StatusSentFully = 6; - static final char StatusNoEffect = 10; - - private char status = 0; - private bool applicationReleased = false; - private bool wasSentInChain = false; - private int64_t transactionSequenceNumber = 0; - private int64_t arbitrator = -1; - - - public TransactionStatus(char _status, int64_t _arbitrator) { - status = _status; - arbitrator = _arbitrator; - } - - public char getStatus() { - return status; - } - - public void setStatus(char _status) { - status = _status; - } - - public int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } - - public void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) { - transactionSequenceNumber = _transactionSequenceNumber; - } - - public int64_t getTransactionArbitrator() { - return arbitrator; - } - - public void release() { - applicationReleased = true; - } - - public bool getReleased() { - return applicationReleased; - } + static final char StatusAborted = 1; + static final char StatusPending = 2; + static final char StatusCommitted = 3; + // static final char StatusRetrying = 4; + static final char StatusSentPartial = 5; + static final char StatusSentFully = 6; + static final char StatusNoEffect = 10; + + private char status = 0; + private bool applicationReleased = false; + private bool wasSentInChain = false; + private int64_t transactionSequenceNumber = 0; + private int64_t arbitrator = -1; + + + public TransactionStatus(char _status, int64_t _arbitrator) { + status = _status; + arbitrator = _arbitrator; + } + + public char getStatus() { + return status; + } + + public void setStatus(char _status) { + status = _status; + } + + public int64_t getTransactionSequenceNumber() { + return transactionSequenceNumber; + } + + public void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) { + transactionSequenceNumber = _transactionSequenceNumber; + } + + public int64_t getTransactionArbitrator() { + return arbitrator; + } + + public void release() { + applicationReleased = true; + } + + public bool getReleased() { + return applicationReleased; + } } diff --git a/version2/src/C/array.h b/version2/src/C/array.h index 3126238..a456fef 100644 --- a/version2/src/C/array.h +++ b/version2/src/C/array.h @@ -6,30 +6,30 @@ typedef uint32_t uint; template class Array { - public: - Array() : - array(NULL), - size(0) { - } - - Array(uint32_t _size) : - array((type *) ourcalloc(1, sizeof(type) * _size)), +public: + Array() : + array(NULL), + size(0) { + } + + Array(uint32_t _size) : + array((type *) ourcalloc(1, sizeof(type) * _size)), size(_size) - { - } - - Array(type *_array, uint _size) : - array((type *) ourmalloc(sizeof(type) * _size)), + { + } + + Array(type *_array, uint _size) : + array((type *) ourmalloc(sizeof(type) * _size)), size(_size) { - memcpy(array, _array, _size * sizeof(type)); - } + memcpy(array, _array, _size * sizeof(type)); + } - Array(Array *_array) : - array((type *) ourmalloc(sizeof(type) * _array->size)), + Array(Array *_array) : + array((type *) ourmalloc(sizeof(type) * _array->size)), size(_array->size) { memcpy(array, _array->array, size * sizeof(type)); } - + void init(uint _size) { array = (type *) ourcalloc(1, sizeof(type) * _size); size = _size; @@ -40,26 +40,26 @@ class Array { size = _size; memcpy(array, _array, _size * sizeof(type)); } - + void init(Array *_array) { array = (type *) ourmalloc(sizeof(type) * _array->size); size = _array->size; memcpy(array, _array->array, size * sizeof(type)); } - + ~Array() { if (array) ourfree(array); } - + type get(uint index) const { return array[index]; } - + void set(uint index, type item) { array[index] = item; } - + uint length() const { return size; } @@ -67,8 +67,8 @@ class Array { type *internalArray() { return array; } - - private: + +private: type *array; uint size; }; diff --git a/version2/src/C/hashset.h b/version2/src/C/hashset.h index 7874e65..ab096fb 100644 --- a/version2/src/C/hashset.h +++ b/version2/src/C/hashset.h @@ -18,10 +18,10 @@ struct Linknode { Linknode<_Key> *next; }; -template +template class Hashset; -template, bool (*equals) (_Key, _Key) = defaultEquals<_Key> > +template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > class SetIterator { public: SetIterator(Linknode<_Key> *_curr, Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *_set) : @@ -76,7 +76,7 @@ private: Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set; }; -template, bool (*equals) (_Key, _Key) = defaultEquals<_Key> > +template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > class Hashset { public: Hashset(unsigned int initialcapacity = 16, double factor = 0.5) : diff --git a/version2/src/C/hashtable.h b/version2/src/C/hashtable.h index b27dde2..513d311 100644 --- a/version2/src/C/hashtable.h +++ b/version2/src/C/hashtable.h @@ -57,7 +57,7 @@ inline bool defaultEquals(_Key key1, _Key key2) { * manipulation and storage. * @tparam _Shift Logical shift to apply to all keys. Default 0. */ -template, bool (*equals) (_Key, _Key) = defaultEquals<_Key> > +template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > class Hashtable { public: /** diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index 8ce18fd..5a7770d 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -19,6 +19,13 @@ public: memcpy(array, _array, capacity * sizeof(type)); } + Vector(Vector *v) : + size(v->size), + capacity(v->capacity), + array((type *) ourmalloc(sizeof(type) * v->capacity)) { + memcpy(array, v->array, capacity * sizeof(type)); + } + void pop() { size--; } @@ -39,7 +46,13 @@ public: size = _size; } - void push(type item) { + void addAll(Vector *v) { + int oldsize = size; + setSize(size + v->size); + memcpy(&array[size], v->array, v->size * sizeof(type)); + } + + void add(type item) { if (size >= capacity) { uint newcap = capacity << 1; array = (type *)ourrealloc(array, newcap * sizeof(type)); -- 2.34.1