#include "Abort.h"
#include "ByteBuffer.h"
-Abort::Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) :
- Entry(slot),
- transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber),
- transactionSequenceNumber(_transactionSequenceNumber),
- transactionMachineId(_transactionMachineId),
- transactionArbitrator(_transactionArbitrator),
- arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber),
- abortId(new Pair<int64_t, int64_t>(transactionMachineId, transactionClientLocalSequenceNumber)) {
+Abort::Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) :
+ Entry(slot),
+ transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber),
+ transactionSequenceNumber(_transactionSequenceNumber),
+ transactionMachineId(_transactionMachineId),
+ transactionArbitrator(_transactionArbitrator),
+ arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber),
+ abortId(new Pair<int64_t, int64_t>(transactionMachineId, transactionClientLocalSequenceNumber)) {
}
-Abort::Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) :
- Entry(slot),
- transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber),
- transactionSequenceNumber(_transactionSequenceNumber),
- sequenceNumber(_sequenceNumber),
- transactionMachineId(_transactionMachineId),
- transactionArbitrator(_transactionArbitrator),
- arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber),
- abortId(new Pair<int64_t, int64_t>(transactionMachineId, transactionClientLocalSequenceNumber)) {
+Abort::Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber) :
+ Entry(slot),
+ transactionClientLocalSequenceNumber(_transactionClientLocalSequenceNumber),
+ transactionSequenceNumber(_transactionSequenceNumber),
+ sequenceNumber(_sequenceNumber),
+ transactionMachineId(_transactionMachineId),
+ transactionArbitrator(_transactionArbitrator),
+ arbitratorLocalSequenceNumber(_arbitratorLocalSequenceNumber),
+ abortId(new Pair<int64_t, int64_t>(transactionMachineId, transactionClientLocalSequenceNumber)) {
}
-Entry * Abortdecode(Slot * slot, ByteBuffer * bb) {
- int64_t transactionClientLocalSequenceNumber = bb->getLong();
- int64_t transactionSequenceNumber = bb->getLong();
- int64_t sequenceNumber = bb->getLong();
- int64_t transactionMachineId = bb->getLong();
- int64_t transactionArbitrator = bb->getLong();
- int64_t arbitratorLocalSequenceNumber = bb->getLong();
-
- return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber);
+Entry *Abortdecode(Slot *slot, ByteBuffer *bb) {
+ int64_t transactionClientLocalSequenceNumber = bb->getLong();
+ int64_t transactionSequenceNumber = bb->getLong();
+ int64_t sequenceNumber = bb->getLong();
+ int64_t transactionMachineId = bb->getLong();
+ int64_t transactionArbitrator = bb->getLong();
+ int64_t arbitratorLocalSequenceNumber = bb->getLong();
+
+ return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber);
}
-void Abort::encode(ByteBuffer * bb) {
- bb->put(TypeAbort);
- bb->putLong(transactionClientLocalSequenceNumber);
- bb->putLong(transactionSequenceNumber);
- bb->putLong(sequenceNumber);
- bb->putLong(transactionMachineId);
- bb->putLong(transactionArbitrator);
- bb->putLong(arbitratorLocalSequenceNumber);
+void Abort::encode(ByteBuffer *bb) {
+ bb->put(TypeAbort);
+ bb->putLong(transactionClientLocalSequenceNumber);
+ bb->putLong(transactionSequenceNumber);
+ bb->putLong(sequenceNumber);
+ bb->putLong(transactionMachineId);
+ bb->putLong(transactionArbitrator);
+ bb->putLong(arbitratorLocalSequenceNumber);
}
#include "Pair.h"
class Abort : public Entry {
- private:
+private:
int64_t transactionClientLocalSequenceNumber;
- int64_t transactionSequenceNumber;
+ int64_t transactionSequenceNumber;
int64_t sequenceNumber;
int64_t transactionMachineId;
int64_t transactionArbitrator;
int64_t arbitratorLocalSequenceNumber;
- Pair<int64_t, int64_t> * abortId;
-
- public:
- Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber);
- Abort(Slot * slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber , int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber);
+ Pair<int64_t, int64_t> *abortId;
+
+public:
+ Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber);
+ Abort(Slot *slot, int64_t _transactionClientLocalSequenceNumber, int64_t _transactionSequenceNumber, int64_t _sequenceNumber, int64_t _transactionMachineId, int64_t _transactionArbitrator, int64_t _arbitratorLocalSequenceNumber);
+
+ Pair<int64_t, int64_t> *getAbortId() {return abortId;}
- Pair<int64_t, int64_t> * getAbortId() {return abortId;}
-
int64_t getTransactionMachineId() { return transactionMachineId; }
int64_t getTransactionSequenceNumber() { return transactionSequenceNumber; }
int64_t getTransactionClientLocalSequenceNumber() { return transactionClientLocalSequenceNumber; }
int64_t getArbitratorLocalSequenceNumber() { return arbitratorLocalSequenceNumber; }
- void setSlot(Slot * s) { parentslot = s; }
- int64_t getSequenceNumber() { return sequenceNumber; }
+ void setSlot(Slot *s) { parentslot = s; }
+ int64_t getSequenceNumber() { return sequenceNumber; }
void setSequenceNumber(int64_t _sequenceNumber) { sequenceNumber = _sequenceNumber; }
int64_t getTransactionArbitrator() { return transactionArbitrator; }
- void encode(ByteBuffer * bb);
+ void encode(ByteBuffer *bb);
int getSize() { return (6 * sizeof(uint64_t)) + sizeof(char); }
char getType() { return TypeAbort; }
- Entry * getCopy(Slot * s) { return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); }
+ Entry *getCopy(Slot *s) { return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); }
};
-Entry * Abortdecode(Slot * slot, ByteBuffer * bb);
+Entry *Abortdecode(Slot *slot, ByteBuffer *bb);
#endif
#include "ArbitrationRound.h"
#include "Commit.h"
-ArbitrationRound::ArbitrationRound(Commit * _commit, Hashset<Abort *> * _abortsBefore) :
+ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset<Abort *> *_abortsBefore) :
abortsBefore(_abortsBefore),
- parts(new Vector<Entry *>()),
- commit(_commit),
+ parts(new Vector<Entry *>()),
+ commit(_commit),
currentSize(0),
didSendPart(false),
didGenerateParts(false) {
-
- if (commit != NULL) {
- commit->createCommitParts();
- currentSize += commit->getNumberOfParts();
+
+ if (commit != NULL) {
+ commit->createCommitParts();
+ currentSize += commit->getNumberOfParts();
}
-
- currentSize += abortsBefore->size();
+
+ currentSize += abortsBefore->size();
}
-void ArbitrationRound::generateParts() {
- if (didGenerateParts) {
+/*
+ void ArbitrationRound::generateParts() {
+ if (didGenerateParts) {
return;
- }
- parts = new Vector<Entry>(abortsBefore);
- if (commit != NULL) {
+ }
+ parts = new Vector<Entry *>((Vector<Entry *> *)abortsBefore);
+ if (commit != NULL) {
parts->addAll(commit->getParts()->values());
- }
-}
+ }
+ }*/
-Vector<Entry *> * ArbitrationRound::getParts() {
- return parts;
+Vector<Entry *> *ArbitrationRound::getParts() {
+ return parts;
}
-void ArbitrationRound::removeParts(Vector<Entry *> * removeParts) {
- parts->removeAll(removeParts);
- didSendPart = true;
-}
+/*
+ void ArbitrationRound::removeParts(Vector<Entry *> * removeParts) {
+ parts->removeAll(removeParts);
+ didSendPart = true;
+ }
+ */
bool ArbitrationRound::isDoneSending() {
- if ((commit == NULL) && abortsBefore->isEmpty()) {
- return true;
- }
-
- return parts->isEmpty();
+ if ((commit == NULL) && abortsBefore->isEmpty()) {
+ return true;
+ }
+
+ return parts->isEmpty();
}
-Commit * ArbitrationRound::getCommit() {
- return commit;
+Commit *ArbitrationRound::getCommit() {
+ return commit;
}
-
-void ArbitrationRound::setCommit(Commit * _commit) {
- if (commit != NULL) {
- currentSize -= commit->getNumberOfParts();
- }
- commit = _commit;
-
- if (commit != NULL) {
- currentSize += commit->getNumberOfParts();
- }
+
+void ArbitrationRound::setCommit(Commit *_commit) {
+ if (commit != NULL) {
+ currentSize -= commit->getNumberOfParts();
+ }
+ commit = _commit;
+
+ if (commit != NULL) {
+ currentSize += commit->getNumberOfParts();
+ }
}
-void ArbitrationRound::addAbort(Abort * abort) {
- abortsBefore->add(abort);
- currentSize++;
+void ArbitrationRound::addAbort(Abort *abort) {
+ abortsBefore->add(abort);
+ currentSize++;
}
-
-void ArbitrationRound::addAborts(Hashset<Abort *> * aborts) {
- abortsBefore->addAll(aborts);
- currentSize += aborts->size();
+
+void ArbitrationRound::addAborts(Hashset<Abort *> *aborts) {
+ abortsBefore->addAll(aborts);
+ currentSize += aborts->size();
}
-
-Hashset<Abort *> * ArbitrationRound::getAborts() {
- return abortsBefore;
+
+Hashset<Abort *> *ArbitrationRound::getAborts() {
+ return abortsBefore;
}
int ArbitrationRound::getAbortsCount() {
- return abortsBefore->size();
+ return abortsBefore->size();
}
int ArbitrationRound::getCurrentSize() {
- return currentSize;
+ return currentSize;
}
bool ArbitrationRound::isFull() {
- return currentSize >= MAX_PARTS;
+ return currentSize >= MAX_PARTS;
}
-
+
bool ArbitrationRound::getDidSendPart() {
- return didSendPart;
+ return didSendPart;
}
#include "common.h"
class ArbitrationRound {
- private:
- Hashset<Abort *> * abortsBefore;
- Vector<Entry *> * parts;
- Commit * commit;
- int currentSize;
- bool didSendPart;
- bool didGenerateParts;
+private:
+ Hashset<Abort *> *abortsBefore;
+ Vector<Entry *> *parts;
+ Commit *commit;
+ int currentSize;
+ bool didSendPart;
+ bool didGenerateParts;
- public:
- ArbitrationRound(Commit * _commit, Hashset<Abort *> * _abortsBefore);
+public:
+ ArbitrationRound(Commit *_commit, Hashset<Abort *> *_abortsBefore);
~ArbitrationRound();
- void generateParts();
- Vector<Entry *> * getParts();
- void removeParts(Vector<Entry *> * removeParts);
- bool isDoneSending();
- void setCommit(Commit * _commit);
- void addAbort(Abort * abort);
- void addAborts(Hashset<Abort *> * aborts);
- Hashset<Abort *> * getAborts();
- int getAbortsCount();
- int getCurrentSize();
- bool isFull();
- bool getDidSendPart();
+ void generateParts();
+ Commit *getCommit();
+ Vector<Entry *> *getParts();
+ void removeParts(Vector<Entry *> *removeParts);
+ bool isDoneSending();
+ void setCommit(Commit *_commit);
+ void addAbort(Abort *abort);
+ void addAborts(Hashset<Abort *> *aborts);
+ Hashset<Abort *> *getAborts();
+ int getAbortsCount();
+ int getCurrentSize();
+ bool isFull();
+ bool getDidSendPart();
};
#endif
#include "common.h"
class ByteBuffer {
- public:
+public:
void put(char c);
void putLong(int64_t l);
int64_t getLong();
- private:
+private:
};
#endif
-
-
-
+#include "CloudComm.h"
/**
- * This class provides a communication API to the webserver. It also
- * validates the HMACs on the slots and handles encryption.
- * @author Brian Demsky <bdemsky@uci.edu>
- * @version 1.0
+ * Empty Constructor needed for child class.
*/
+CloudComm::CloudComm() :
+ baseurl(NULL),
+ key(NULL),
+ mac(NULL),
+ password(NULL),
+ random(NULL),
+ salt(NULL),
+ table(NULL),
+ listeningPort(-1),
+ localServerThread(NULL),
+ doEnd(false)
+ timer(TimingSingleton.getInstance())
+{
+}
-
-class CloudComm {
- static final int SALT_SIZE = 8;
- static final int TIMEOUT_MILLIS = 5000; // 100
- static final int IV_SIZE = 16;
-
- /** Sets the size for the HMAC. */
- static final int HMAC_SIZE = 32;
-
- String baseurl;
- SecretKeySpec key;
- Mac mac;
- String password;
- SecureRandom random;
- char salt[];
- Table table;
- int listeningPort = -1;
- Thread localServerThread = NULL;
- bool doEnd = false;
-
- TimingSingleton timer = NULL;
-
- /**
- * Empty Constructor needed for child class.
- */
- CloudComm() {
- timer = TimingSingleton.getInstance();
- }
-
- /**
- * Constructor for actual use. Takes in the url and password.
- */
- CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) {
- timer = TimingSingleton.getInstance();
- this.table = _table;
- this.baseurl = _baseurl;
- this.password = _password;
- this.random = new SecureRandom();
- this.listeningPort = _listeningPort;
-
- if (this.listeningPort > 0) {
- localServerThread = new Thread(new Runnable() {
- void run() {
- localServerWorkerFunction();
- }
- });
- localServerThread.start();
- }
+/**
+ * Constructor for actual use. Takes in the url and password.
+ */
+CloudComm::CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) :
+ baseurl(_baseurl),
+ key(NULL),
+ mac(NULL),
+ password(_password),
+ random(new SecureRandom()),
+ salt(NULL),
+ table(_table),
+ listeningPort(_listeningPort),
+ localServerThread(NULL),
+ doEnd(false)
+ timer(TimingSingleton.getInstance()) {
+ if (this.listeningPort > 0) {
+ localServerThread = new Thread(new Runnable() {
+ void run() {
+ localServerWorkerFunction();
+ }
+ });
+ localServerThread.start();
}
+}
- /**
- * Generates Key from password.
- */
- SecretKeySpec initKey() {
- try {
- PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(),
- salt,
- 65536,
- 128);
- SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
- return new SecretKeySpec(tmpkey.getEncoded(), "AES");
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed generating key.");
- }
+/**
+ * Generates Key from password.
+ */
+SecretKeySpec *CloudComm::initKey() {
+ try {
+ PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(),
+ salt,
+ 65536,
+ 128);
+ SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
+ return new SecretKeySpec(tmpkey.getEncoded(), "AES");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed generating key.");
}
+}
- /**
- * Inits all the security stuff
- */
- void initSecurity() throws ServerException {
- // try to get the salt and if one does not exist set one
- if (!getSalt()) {
- //Set the salt
- setSalt();
- }
+/**
+ * Inits all the security stuff
+ */
- initCrypt();
+void CloudComm::initSecurity() {
+ // try to get the salt and if one does not exist set one
+ if (!getSalt()) {
+ //Set the salt
+ setSalt();
}
- /**
- * Inits the HMAC generator.
- */
- void initCrypt() {
+ initCrypt();
+}
- if (password == NULL) {
- return;
- }
+/**
+ * Inits the HMAC generator.
+ */
+void CloudComm::initCrypt() {
- try {
- key = initKey();
- password = NULL; // drop password
- mac = Mac.getInstance("HmacSHA256");
- mac.init(key);
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed To Initialize Ciphers");
- }
+ if (password == NULL) {
+ return;
}
- /*
- * Builds the URL for the given request.
- */
- URL buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) throws IOException {
- String reqstring = isput ? "req=putslot" : "req=getslot";
- String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber;
- if (maxentries != 0)
- urlstr += "&max=" + maxentries;
- return new URL(urlstr);
+ try {
+ key = initKey();
+ password = NULL;// drop password
+ mac = Mac.getInstance("HmacSHA256");
+ mac.init(key);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed To Initialize Ciphers");
}
+}
- void setSalt() throws ServerException {
-
- if (salt != NULL) {
- // Salt already sent to server so dont set it again
- return;
- }
-
- try {
- char[] saltTmp = new char[SALT_SIZE];
- random.nextBytes(saltTmp);
-
- for (int i = 0; i < SALT_SIZE; i++) {
- System.out.println((int)saltTmp[i] & 255);
- }
+/*
+ * Builds the URL for the given request.
+ */
+URL *CloudComm::buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) {
+ IoTString *reqstring = isput ? "req=putslot" : "req=getslot";
+ IoTString *urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber;
+ if (maxentries != 0)
+ urlstr += "&max=" + maxentries;
+ return new URL(urlstr);
+}
+void CloudComm::setSalt() {
- URL url = new URL(baseurl + "?req=setsalt");
+ if (salt != NULL) {
+ // Salt already sent to server so dont set it again
+ return;
+ }
- timer.startTime();
- URLConnection con = url.openConnection();
- HttpURLConnection http = (HttpURLConnection) con;
+ try {
+ char[] saltTmp = new char[SALT_SIZE];
+ random.nextBytes(saltTmp);
- http.setRequestMethod("POST");
- http.setFixedLengthStreamingMode(saltTmp.length);
- http.setDoOutput(true);
- http.setConnectTimeout(TIMEOUT_MILLIS);
+ for (int i = 0; i < SALT_SIZE; i++) {
+ System.out.println((int)saltTmp[i] & 255);
+ }
- http.connect();
+ URL url = new URL(baseurl + "?req=setsalt");
- OutputStream os = http.getOutputStream();
- os.write(saltTmp);
- os.flush();
+ timer.startTime();
+ URLConnection con = url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
- int responsecode = http.getResponseCode();
- if (responsecode != HttpURLConnection.HTTP_OK) {
- // TODO: Remove this print
- System.out.println(responsecode);
- throw new Error("Invalid response");
- }
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(saltTmp.length);
+ http.setDoOutput(true);
+ http.setConnectTimeout(TIMEOUT_MILLIS);
- timer.endTime();
- salt = saltTmp;
- } catch (Exception e) {
- // e.printStackTrace();
- timer.endTime();
- throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
- }
- }
+ http.connect();
- bool getSalt() throws ServerException {
- URL url = NULL;
- URLConnection con = NULL;
- HttpURLConnection http = NULL;
+ OutputStream os = http.getOutputStream();
+ os.write(saltTmp);
+ os.flush();
- try {
- url = new URL(baseurl + "?req=getsalt");
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlot failed");
+ int responsecode = http.getResponseCode();
+ if (responsecode != HttpURLConnection.HTTP_OK) {
+ // TODO: Remove this print
+ System.out.println(responsecode);
+ throw new Error("Invalid response");
}
- try {
- timer.startTime();
- con = url.openConnection();
- http = (HttpURLConnection) con;
- http.setRequestMethod("POST");
- http.setConnectTimeout(TIMEOUT_MILLIS);
- http.setReadTimeout(TIMEOUT_MILLIS);
+ timer.endTime();
+ salt = saltTmp;
+ } catch (Exception e) {
+ // e.printStackTrace();
+ timer.endTime();
+ throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
+ }
+}
- http.connect();
- timer.endTime();
- } catch (SocketTimeoutException e) {
- timer.endTime();
- throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlot failed");
- }
+bool CloudComm::getSalt() {
+ URL *url = NULL;
+ URLConnection *con = NULL;
+ HttpURLConnection *http = NULL;
- try {
+ try {
+ url = new URL(baseurl + "?req=getsalt");
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlot failed");
+ }
+ try {
+
+ timer.startTime();
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.setReadTimeout(TIMEOUT_MILLIS);
+
+
+ http.connect();
+ timer.endTime();
+ } catch (SocketTimeoutException e) {
+ timer.endTime();
+ throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlot failed");
+ }
- timer.startTime();
+ try {
- int responsecode = http.getResponseCode();
- if (responsecode != HttpURLConnection.HTTP_OK) {
- // TODO: Remove this print
- // System.out.println(responsecode);
- throw new Error("Invalid response");
- }
+ timer.startTime();
- InputStream is = http.getInputStream();
- if (is.available() > 0) {
- DataInputStream dis = new DataInputStream(is);
- int salt_length = dis.readInt();
- char [] tmp = new char[salt_length];
- dis.readFully(tmp);
- salt = tmp;
- timer.endTime();
+ int responsecode = http.getResponseCode();
+ if (responsecode != HttpURLConnection.HTTP_OK) {
+ // TODO: Remove this print
+ // System.out.println(responsecode);
+ throw new Error("Invalid response");
+ }
- return true;
- } else {
- timer.endTime();
+ InputStream is = http.getInputStream();
+ if (is.available() > 0) {
+ DataInputStream dis = new DataInputStream(is);
+ int salt_length = dis.readInt();
+ char [] tmp = new char[salt_length];
+ dis.readFully(tmp);
+ salt = tmp;
+ timer.endTime();
- return false;
- }
- } catch (SocketTimeoutException e) {
+ return true;
+ } else {
timer.endTime();
- throw new ServerException("getSalt failed", ServerException.TypeInputTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlot failed");
+ return false;
}
- }
-
- char[] createIV(int64_t machineId, int64_t localSequenceNumber) {
- ByteBuffer buffer = ByteBuffer.allocate(IV_SIZE);
- buffer.putLong(machineId);
- int64_t localSequenceNumberShifted = localSequenceNumber << 16;
- buffer.putLong(localSequenceNumberShifted);
- return buffer.array();
+ } catch (SocketTimeoutException e) {
+ timer.endTime();
+ throw new ServerException("getSalt failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlot failed");
}
+}
- char[] encryptSlotAndPrependIV(char[] rawData, char[] ivBytes) {
- try {
- IvParameterSpec ivSpec = new IvParameterSpec(ivBytes);
- Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
- cipher.init(Cipher.ENCRYPT_MODE, key, ivSpec);
+Array<char> *CloudComm::createIV(int64_t machineId, int64_t localSequenceNumber) {
+ ByteBuffer buffer = ByteBuffer.allocate(IV_SIZE);
+ buffer.putLong(machineId);
+ int64_t localSequenceNumberShifted = localSequenceNumber << 16;
+ buffer.putLong(localSequenceNumberShifted);
+ return buffer.array();
+}
- char[] encryptedBytes = cipher.doFinal(rawData);
+Array<char> *CloudComm::encryptSlotAndPrependIV(Array<char> *rawData, Array<char> *ivBytes) {
+ try {
+ IvParameterSpec ivSpec = new IvParameterSpec(ivBytes);
+ Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
+ cipher.init(Cipher.ENCRYPT_MODE, key, ivSpec);
- char[] chars = new char[encryptedBytes.length + IV_SIZE];
- System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length);
- System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length);
+ char[] encryptedBytes = cipher.doFinal(rawData);
- return chars;
+ char[] chars = new char[encryptedBytes.length + IV_SIZE];
+ System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length);
+ System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length);
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed To Encrypt");
- }
+ return chars;
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed To Encrypt");
}
+}
- char[] stripIVAndDecryptSlot(char[] rawData) {
- try {
- char[] ivBytes = new char[IV_SIZE];
- char[] encryptedBytes = new char[rawData.length - IV_SIZE];
- System.arraycopy(rawData, 0, ivBytes, 0, IV_SIZE);
- System.arraycopy(rawData, IV_SIZE, encryptedBytes, 0 , encryptedBytes.length);
+Array<char> *CloudComm::stripIVAndDecryptSlot(Array<char> *rawData) {
+ try {
+ Array<char> *ivBytes = new char[IV_SIZE];
+ Array<char> *encryptedBytes = new char[rawData.length - IV_SIZE];
+ System.arraycopy(rawData, 0, ivBytes, 0, IV_SIZE);
+ System.arraycopy(rawData, IV_SIZE, encryptedBytes, 0, encryptedBytes.length);
- IvParameterSpec ivSpec = new IvParameterSpec(ivBytes);
+ IvParameterSpec ivSpec = new IvParameterSpec(ivBytes);
- Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
- cipher.init(Cipher.DECRYPT_MODE, key, ivSpec);
- return cipher.doFinal(encryptedBytes);
+ Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
+ cipher.init(Cipher.DECRYPT_MODE, key, ivSpec);
+ return cipher.doFinal(encryptedBytes);
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed To Decrypt");
- }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Failed To Decrypt");
}
+}
- /*
- * API for putting a slot into the queue. Returns NULL on success.
- * On failure, the server will send slots with newer sequence
- * numbers.
- */
- Slot[] putSlot(Slot slot, int max) throws ServerException {
- URL url = NULL;
- URLConnection con = NULL;
- HttpURLConnection http = NULL;
+/*
+ * API for putting a slot into the queue. Returns NULL on success.
+ * On failure, the server will send slots with newer sequence
+ * numbers.
+ */
+Array<Slot *> *CloudComm::putSlot(Slot *slot, int max) {
+ URL url = NULL;
+ URLConnection con = NULL;
+ HttpURLConnection http = NULL;
- try {
- if (salt == NULL) {
- if (!getSalt()) {
- throw new ServerException("putSlot failed", ServerException.TypeSalt);
- }
- initCrypt();
+ try {
+ if (salt == NULL) {
+ if (!getSalt()) {
+ throw new ServerException("putSlot failed", ServerException.TypeSalt);
}
+ initCrypt();
+ }
- int64_t sequencenumber = slot.getSequenceNumber();
- char[] slotBytes = slot.encode(mac);
- // slotBytes = encryptCipher.doFinal(slotBytes);
+ int64_t sequencenumber = slot.getSequenceNumber();
+ char[] slotBytes = slot.encode(mac);
+ // slotBytes = encryptCipher.doFinal(slotBytes);
- // char[] iVBytes = slot.getSlotCryptIV();
+ // char[] iVBytes = slot.getSlotCryptIV();
- // char[] chars = new char[slotBytes.length + IV_SIZE];
- // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length);
- // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length);
+ // char[] chars = new char[slotBytes.length + IV_SIZE];
+ // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length);
+ // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length);
- char[] chars = encryptSlotAndPrependIV(slotBytes, slot.getSlotCryptIV());
+ char[] chars = encryptSlotAndPrependIV(slotBytes, slot.getSlotCryptIV());
- url = buildRequest(true, sequencenumber, max);
+ url = buildRequest(true, sequencenumber, max);
- timer.startTime();
- con = url.openConnection();
- http = (HttpURLConnection) con;
+ timer.startTime();
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
- http.setRequestMethod("POST");
- http.setFixedLengthStreamingMode(chars.length);
- http.setDoOutput(true);
- http.setConnectTimeout(TIMEOUT_MILLIS);
- http.setReadTimeout(TIMEOUT_MILLIS);
- http.connect();
+ http.setRequestMethod("POST");
+ http.setFixedLengthStreamingMode(chars.length);
+ http.setDoOutput(true);
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.setReadTimeout(TIMEOUT_MILLIS);
+ http.connect();
- OutputStream os = http.getOutputStream();
- os.write(chars);
- os.flush();
+ OutputStream os = http.getOutputStream();
+ os.write(chars);
+ os.flush();
- timer.endTime();
+ timer.endTime();
- // System.out.println("Bytes Sent: " + chars.length);
- } catch (ServerException e) {
- timer.endTime();
+ // System.out.println("Bytes Sent: " + chars.length);
+ } catch (ServerException e) {
+ timer.endTime();
- throw e;
- } catch (SocketTimeoutException e) {
- timer.endTime();
-
- throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("putSlot failed");
- }
+ throw e;
+ } catch (SocketTimeoutException e) {
+ timer.endTime();
+ throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("putSlot failed");
+ }
- try {
- timer.startTime();
- InputStream is = http.getInputStream();
- DataInputStream dis = new DataInputStream(is);
- char[] resptype = new char[7];
- dis.readFully(resptype);
- timer.endTime();
- if (Arrays.equals(resptype, "getslot".getBytes())) {
- return processSlots(dis);
- } else if (Arrays.equals(resptype, "putslot".getBytes())) {
- return NULL;
- } else
- throw new Error("Bad response to putslot");
+ try {
+ timer.startTime();
+ InputStream is = http.getInputStream();
+ DataInputStream dis = new DataInputStream(is);
+ char[] resptype = new char[7];
+ dis.readFully(resptype);
+ timer.endTime();
- } catch (SocketTimeoutException e) {
- timer.endTime();
- throw new ServerException("putSlot failed", ServerException.TypeInputTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("putSlot failed");
- }
+ if (Arrays.equals(resptype, "getslot".getBytes())) {
+ return processSlots(dis);
+ } else if (Arrays.equals(resptype, "putslot".getBytes())) {
+ return NULL;
+ } else
+ throw new Error("Bad response to putslot");
+
+ } catch (SocketTimeoutException e) {
+ timer.endTime();
+ throw new ServerException("putSlot failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("putSlot failed");
}
+}
- /**
- * Request the server to send all slots with the given
- * sequencenumber or newer.
- */
- Slot[] getSlots(int64_t sequencenumber) throws ServerException {
- URL url = NULL;
- URLConnection con = NULL;
- HttpURLConnection http = NULL;
+/**
+ * Request the server to send all slots with the given
+ * sequencenumber or newer.
+ */
+Array<Slot *> *CloudComm::getSlots(int64_t sequencenumber) {
+ URL url = NULL;
+ URLConnection con = NULL;
+ HttpURLConnection http = NULL;
- try {
- if (salt == NULL) {
- if (!getSalt()) {
- throw new ServerException("getSlots failed", ServerException.TypeSalt);
- }
- initCrypt();
+ try {
+ if (salt == NULL) {
+ if (!getSalt()) {
+ throw new ServerException("getSlots failed", ServerException.TypeSalt);
}
+ initCrypt();
+ }
- url = buildRequest(false, sequencenumber, 0);
- timer.startTime();
- con = url.openConnection();
- http = (HttpURLConnection) con;
- http.setRequestMethod("POST");
- http.setConnectTimeout(TIMEOUT_MILLIS);
- http.setReadTimeout(TIMEOUT_MILLIS);
+ url = buildRequest(false, sequencenumber, 0);
+ timer.startTime();
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
+ http.setRequestMethod("POST");
+ http.setConnectTimeout(TIMEOUT_MILLIS);
+ http.setReadTimeout(TIMEOUT_MILLIS);
- http.connect();
- timer.endTime();
+ http.connect();
+ timer.endTime();
- } catch (SocketTimeoutException e) {
- timer.endTime();
+ } catch (SocketTimeoutException e) {
+ timer.endTime();
- throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout);
- } catch (ServerException e) {
- timer.endTime();
+ throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout);
+ } catch (ServerException e) {
+ timer.endTime();
- throw e;
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlots failed");
- }
+ throw e;
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlots failed");
+ }
- try {
+ try {
- timer.startTime();
- InputStream is = http.getInputStream();
- DataInputStream dis = new DataInputStream(is);
- char[] resptype = new char[7];
+ timer.startTime();
+ InputStream is = http.getInputStream();
+ DataInputStream dis = new DataInputStream(is);
+ char[] resptype = new char[7];
- dis.readFully(resptype);
- timer.endTime();
+ dis.readFully(resptype);
+ timer.endTime();
- if (!Arrays.equals(resptype, "getslot".getBytes()))
- throw new Error("Bad Response: " + new String(resptype));
+ if (!Arrays.equals(resptype, "getslot".getBytes()))
+ throw new Error("Bad Response: " + new String(resptype));
- return processSlots(dis);
- } catch (SocketTimeoutException e) {
- timer.endTime();
+ return processSlots(dis);
+ } catch (SocketTimeoutException e) {
+ timer.endTime();
- throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlots failed");
- }
+ throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ // e.printStackTrace();
+ throw new Error("getSlots failed");
}
+}
- /**
- * Method that actually handles building Slot objects from the
- * server response. Shared by both putSlot and getSlots.
- */
- Slot[] processSlots(DataInputStream dis) throws Exception {
- int numberofslots = dis.readInt();
- int[] sizesofslots = new int[numberofslots];
+/**
+ * Method that actually handles building Slot objects from the
+ * server response. Shared by both putSlot and getSlots.
+ */
+Array<Slot *> *CloudComm::processSlots(DataInputStream dis) {
+ int numberofslots = dis.readInt();
+ int[] sizesofslots = new int[numberofslots];
- Slot[] slots = new Slot[numberofslots];
- for (int i = 0; i < numberofslots; i++)
- sizesofslots[i] = dis.readInt();
+ Slot[] slots = new Slot[numberofslots];
+ for (int i = 0; i < numberofslots; i++)
+ sizesofslots[i] = dis.readInt();
- for (int i = 0; i < numberofslots; i++) {
+ for (int i = 0; i < numberofslots; i++) {
- char[] rawData = new char[sizesofslots[i]];
- dis.readFully(rawData);
+ char[] rawData = new char[sizesofslots[i]];
+ dis.readFully(rawData);
- // char[] data = new char[rawData.length - IV_SIZE];
- // System.arraycopy(rawData, IV_SIZE, data, 0, data.length);
+ // char[] data = new char[rawData.length - IV_SIZE];
+ // System.arraycopy(rawData, IV_SIZE, data, 0, data.length);
- char[] data = stripIVAndDecryptSlot(rawData);
+ char[] data = stripIVAndDecryptSlot(rawData);
- // data = decryptCipher.doFinal(data);
+ // data = decryptCipher.doFinal(data);
- slots[i] = Slot.decode(table, data, mac);
- }
- dis.close();
- return slots;
+ slots[i] = Slot.decode(table, data, mac);
}
+ dis.close();
+ return slots;
+}
- char[] sendLocalData(char[] sendData, int64_t localSequenceNumber, String host, int port) {
+Array<char> *sendLocalData(Array<char> *sendData, int64_t localSequenceNumber, String host, int port) {
- if (salt == NULL) {
- return NULL;
- }
- try {
- System.out.println("Passing Locally");
+ if (salt == NULL) {
+ return NULL;
+ }
+ try {
+ System.out.println("Passing Locally");
- mac.update(sendData);
- char[] genmac = mac.doFinal();
- char[] totalData = new char[sendData.length + genmac.length];
- System.arraycopy(sendData, 0, totalData, 0, sendData.length);
- System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length);
+ mac.update(sendData);
+ char[] genmac = mac.doFinal();
+ char[] totalData = new char[sendData.length + genmac.length];
+ System.arraycopy(sendData, 0, totalData, 0, sendData.length);
+ System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length);
- // Encrypt the data for sending
- // char[] encryptedData = encryptCipher.doFinal(totalData);
- // char[] encryptedData = encryptCipher.doFinal(totalData);
- char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber());
- char[] encryptedData = encryptSlotAndPrependIV(totalData, iv);
+ // Encrypt the data for sending
+ // char[] encryptedData = encryptCipher.doFinal(totalData);
+ // char[] encryptedData = encryptCipher.doFinal(totalData);
+ char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber());
+ char[] encryptedData = encryptSlotAndPrependIV(totalData, iv);
- // Open a TCP socket connection to a local device
- Socket socket = new Socket(host, port);
- socket.setReuseAddress(true);
- DataOutputStream output = new DataOutputStream(socket.getOutputStream());
- DataInputStream input = new DataInputStream(socket.getInputStream());
+ // Open a TCP socket connection to a local device
+ Socket socket = new Socket(host, port);
+ socket.setReuseAddress(true);
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+ DataInputStream input = new DataInputStream(socket.getInputStream());
- timer.startTime();
- // Send data to output (length of data, the data)
- output.writeInt(encryptedData.length);
- output.write(encryptedData, 0, encryptedData.length);
- output.flush();
+ timer.startTime();
+ // Send data to output (length of data, the data)
+ output.writeInt(encryptedData.length);
+ output.write(encryptedData, 0, encryptedData.length);
+ output.flush();
- int lengthOfReturnData = input.readInt();
- char[] returnData = new char[lengthOfReturnData];
- input.readFully(returnData);
+ int lengthOfReturnData = input.readInt();
+ char[] returnData = new char[lengthOfReturnData];
+ input.readFully(returnData);
- timer.endTime();
+ timer.endTime();
- // returnData = decryptCipher.doFinal(returnData);
- returnData = stripIVAndDecryptSlot(returnData);
- // returnData = decryptCipher.doFinal(returnData);
+ // returnData = decryptCipher.doFinal(returnData);
+ returnData = stripIVAndDecryptSlot(returnData);
+ // returnData = decryptCipher.doFinal(returnData);
- // We are done with this socket
- socket.close();
+ // We are done with this socket
+ socket.close();
- mac.update(returnData, 0, returnData.length - HMAC_SIZE);
- char[] realmac = mac.doFinal();
- char[] recmac = new char[HMAC_SIZE];
- System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length);
+ mac.update(returnData, 0, returnData.length - HMAC_SIZE);
+ char[] realmac = mac.doFinal();
+ char[] recmac = new char[HMAC_SIZE];
+ System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length);
- if (!Arrays.equals(recmac, realmac))
- throw new Error("Local Error: Invalid HMAC! Potential Attack!");
+ if (!Arrays.equals(recmac, realmac))
+ throw new Error("Local Error: Invalid HMAC! Potential Attack!");
- char[] returnData2 = new char[lengthOfReturnData - recmac.length];
- System.arraycopy(returnData, 0, returnData2, 0, returnData2.length);
+ char[] returnData2 = new char[lengthOfReturnData - recmac.length];
+ System.arraycopy(returnData, 0, returnData2, 0, returnData2.length);
- return returnData2;
- } catch (Exception e) {
- e.printStackTrace();
- // throw new Error("Local comms failure...");
-
- }
+ return returnData2;
+ } catch (Exception e) {
+ e.printStackTrace();
+ // throw new Error("Local comms failure...");
- return NULL;
}
- void localServerWorkerFunction() {
+ return NULL;
+}
- ServerSocket inputSocket = NULL;
+void CloudComm::localServerWorkerFunction() {
- try {
- // Local server socket
- inputSocket = new ServerSocket(listeningPort);
- inputSocket.setReuseAddress(true);
- inputSocket.setSoTimeout(TIMEOUT_MILLIS);
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Local server setup failure...");
- }
+ ServerSocket inputSocket = NULL;
- while (!doEnd) {
+ try {
+ // Local server socket
+ inputSocket = new ServerSocket(listeningPort);
+ inputSocket.setReuseAddress(true);
+ inputSocket.setSoTimeout(TIMEOUT_MILLIS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local server setup failure...");
+ }
- try {
- // Accept incoming socket
- Socket socket = inputSocket.accept();
+ while (!doEnd) {
- DataInputStream input = new DataInputStream(socket.getInputStream());
- DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+ try {
+ // Accept incoming socket
+ Socket socket = inputSocket.accept();
- // Get the encrypted data from the server
- int dataSize = input.readInt();
- char[] readData = new char[dataSize];
- input.readFully(readData);
+ DataInputStream input = new DataInputStream(socket.getInputStream());
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
- timer.endTime();
+ // Get the encrypted data from the server
+ int dataSize = input.readInt();
+ char[] readData = new char[dataSize];
+ input.readFully(readData);
- // Decrypt the data
- // readData = decryptCipher.doFinal(readData);
- readData = stripIVAndDecryptSlot(readData);
+ timer.endTime();
- mac.update(readData, 0, readData.length - HMAC_SIZE);
- char[] genmac = mac.doFinal();
- char[] recmac = new char[HMAC_SIZE];
- System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length);
+ // Decrypt the data
+ // readData = decryptCipher.doFinal(readData);
+ readData = stripIVAndDecryptSlot(readData);
- if (!Arrays.equals(recmac, genmac))
- throw new Error("Local Error: Invalid HMAC! Potential Attack!");
+ mac.update(readData, 0, readData.length - HMAC_SIZE);
+ char[] genmac = mac.doFinal();
+ char[] recmac = new char[HMAC_SIZE];
+ System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length);
- char[] returnData = new char[readData.length - recmac.length];
- System.arraycopy(readData, 0, returnData, 0, returnData.length);
+ if (!Arrays.equals(recmac, genmac))
+ throw new Error("Local Error: Invalid HMAC! Potential Attack!");
- // Process the data
- // char[] sendData = table.acceptDataFromLocal(readData);
- char[] sendData = table.acceptDataFromLocal(returnData);
+ char[] returnData = new char[readData.length - recmac.length];
+ System.arraycopy(readData, 0, returnData, 0, returnData.length);
+ // Process the data
+ // char[] sendData = table.acceptDataFromLocal(readData);
+ char[] sendData = table.acceptDataFromLocal(returnData);
- mac.update(sendData);
- char[] realmac = mac.doFinal();
- char[] totalData = new char[sendData.length + realmac.length];
- System.arraycopy(sendData, 0, totalData, 0, sendData.length);
- System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length);
- // Encrypt the data for sending
- // char[] encryptedData = encryptCipher.doFinal(totalData);
- char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber());
- char[] encryptedData = encryptSlotAndPrependIV(totalData, iv);
+ mac.update(sendData);
+ char[] realmac = mac.doFinal();
+ char[] totalData = new char[sendData.length + realmac.length];
+ System.arraycopy(sendData, 0, totalData, 0, sendData.length);
+ System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length);
+ // Encrypt the data for sending
+ // char[] encryptedData = encryptCipher.doFinal(totalData);
+ char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber());
+ char[] encryptedData = encryptSlotAndPrependIV(totalData, iv);
- timer.startTime();
- // Send data to output (length of data, the data)
- output.writeInt(encryptedData.length);
- output.write(encryptedData, 0, encryptedData.length);
- output.flush();
- // close the socket
- socket.close();
- } catch (Exception e) {
+ timer.startTime();
+ // Send data to output (length of data, the data)
+ output.writeInt(encryptedData.length);
+ output.write(encryptedData, 0, encryptedData.length);
+ output.flush();
- }
- }
+ // close the socket
+ socket.close();
+ } catch (Exception e) {
- if (inputSocket != NULL) {
- try {
- inputSocket.close();
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Local server close failure...");
- }
}
}
- void close() {
- doEnd = true;
-
- if (localServerThread != NULL) {
- try {
- localServerThread.join();
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Local Server thread join issue...");
- }
+ if (inputSocket != NULL) {
+ try {
+ inputSocket.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local server close failure...");
}
-
- // System.out.println("Done Closing Cloud Comm");
}
+}
+
+void CloudComm::close() {
+ doEnd = true;
- protected void finalize() throws Throwable {
+ if (localServerThread != NULL) {
try {
- close(); // close open files
- } finally {
- super.finalize();
+ localServerThread.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local Server thread join issue...");
}
}
+
+ // System.out.println("Done Closing Cloud Comm");
}
+
+#ifndef CLOUDCOMM_H
+#define CLOUDCOMM_H
-
-
+#include "common.h"
/**
* This class provides a communication API to the webserver. It also
* @version 1.0
*/
+#define CloudComm_SALT_SIZE 8
+#define CloudComm_TIMEOUT_MILLIS 5000
+; // 100
+#define CloudComm_IV_SIZE 16
+/** Sets the size for the HMAC. */
+#define CloudComm_HMAC_SIZE 32
class CloudComm {
- private static final int SALT_SIZE = 8;
- private static final int TIMEOUT_MILLIS = 5000; // 100
- public static final int IV_SIZE = 16;
-
- /** Sets the size for the HMAC. */
- static final int HMAC_SIZE = 32;
-
- private String baseurl;
- private SecretKeySpec key;
- private Mac mac;
- private String password;
- private SecureRandom random;
- private char salt[];
- private Table table;
- private int listeningPort = -1;
- private Thread localServerThread = NULL;
- private bool doEnd = false;
-
- private TimingSingleton timer = NULL;
+private:
+ IoTString *baseurl;
+ SecretKeySpec *key;
+ Mac *mac;
+ IoTString *password;
+ SecureRandom *random;
+ Array<char> *salt;
+ Table *table;
+ int32_t listeningPort = -1;
+ Thread *localServerThread = NULL;
+ bool doEnd = false;
+ TimingSingleton *timer = NULL;
/**
- * Empty Constructor needed for child class.
+ * Generates Key from password.
*/
- CloudComm() {
- timer = TimingSingleton.getInstance();
- }
+ SecretKeySpec *initKey();
/**
- * Constructor for actual use. Takes in the url and password.
+ * Inits the HMAC generator.
*/
- CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) {
- timer = TimingSingleton.getInstance();
- this.table = _table;
- this.baseurl = _baseurl;
- this.password = _password;
- this.random = new SecureRandom();
- this.listeningPort = _listeningPort;
-
- if (this.listeningPort > 0) {
- localServerThread = new Thread(new Runnable() {
- public void run() {
- localServerWorkerFunction();
- }
- });
- localServerThread.start();
- }
- }
+ void initCrypt();
- /**
- * Generates Key from password.
+ /*
+ * Builds the URL for the given request.
*/
- private SecretKeySpec initKey() {
- try {
- PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(),
- salt,
- 65536,
- 128);
- SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
- return new SecretKeySpec(tmpkey.getEncoded(), "AES");
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed generating key.");
- }
- }
-
+ URL buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries);
+ void setSalt();
+ bool getSalt();
+ Array<char> *createIV(int64_t machineId, int64_t localSequenceNumber);
+ Array<char> *encryptSlotAndPrependIV(Array<char> *rawData, Array<char> *ivBytes);
+ Array<char> *stripIVAndDecryptSlot(Array<char> *rawData);
+ Array<Slot *> *processSlots(DataInputStream dis);
+ void localServerWorkerFunction();
+
+public:
/**
- * Inits all the security stuff
+ * Empty Constructor needed for child class.
*/
- public void initSecurity() throws ServerException {
- // try to get the salt and if one does not exist set one
- if (!getSalt()) {
- //Set the salt
- setSalt();
- }
-
- initCrypt();
- }
+ CloudComm();
/**
- * Inits the HMAC generator.
+ * Constructor for actual use. Takes in the url and password.
*/
- private void initCrypt() {
-
- if (password == NULL) {
- return;
- }
-
- try {
- key = initKey();
- password = NULL; // drop password
- mac = Mac.getInstance("HmacSHA256");
- mac.init(key);
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed To Initialize Ciphers");
- }
- }
+ CloudComm(Table _table, String _baseurl, String _password, int _listeningPort);
- /*
- * Builds the URL for the given request.
+ /**
+ * Inits all the security stuff
*/
- private URL buildRequest(bool isput, int64_t sequencenumber, int64_t maxentries) throws IOException {
- String reqstring = isput ? "req=putslot" : "req=getslot";
- String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber;
- if (maxentries != 0)
- urlstr += "&max=" + maxentries;
- return new URL(urlstr);
- }
-
- private void setSalt() throws ServerException {
-
- if (salt != NULL) {
- // Salt already sent to server so dont set it again
- return;
- }
-
- try {
- char[] saltTmp = new char[SALT_SIZE];
- random.nextBytes(saltTmp);
-
- for (int i = 0; i < SALT_SIZE; i++) {
- System.out.println((int)saltTmp[i] & 255);
- }
-
-
- URL url = new URL(baseurl + "?req=setsalt");
-
- timer.startTime();
- URLConnection con = url.openConnection();
- HttpURLConnection http = (HttpURLConnection) con;
-
- http.setRequestMethod("POST");
- http.setFixedLengthStreamingMode(saltTmp.length);
- http.setDoOutput(true);
- http.setConnectTimeout(TIMEOUT_MILLIS);
-
-
- http.connect();
-
- OutputStream os = http.getOutputStream();
- os.write(saltTmp);
- os.flush();
-
- int responsecode = http.getResponseCode();
- if (responsecode != HttpURLConnection.HTTP_OK) {
- // TODO: Remove this print
- System.out.println(responsecode);
- throw new Error("Invalid response");
- }
-
- timer.endTime();
-
- salt = saltTmp;
- } catch (Exception e) {
- // e.printStackTrace();
- timer.endTime();
- throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
- }
- }
-
- private bool getSalt() throws ServerException {
- URL url = NULL;
- URLConnection con = NULL;
- HttpURLConnection http = NULL;
-
- try {
- url = new URL(baseurl + "?req=getsalt");
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlot failed");
- }
- try {
-
- timer.startTime();
- con = url.openConnection();
- http = (HttpURLConnection) con;
- http.setRequestMethod("POST");
- http.setConnectTimeout(TIMEOUT_MILLIS);
- http.setReadTimeout(TIMEOUT_MILLIS);
-
-
- http.connect();
- timer.endTime();
- } catch (SocketTimeoutException e) {
- timer.endTime();
- throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlot failed");
- }
-
- try {
-
- timer.startTime();
-
- int responsecode = http.getResponseCode();
- if (responsecode != HttpURLConnection.HTTP_OK) {
- // TODO: Remove this print
- // System.out.println(responsecode);
- throw new Error("Invalid response");
- }
-
- InputStream is = http.getInputStream();
- if (is.available() > 0) {
- DataInputStream dis = new DataInputStream(is);
- int salt_length = dis.readInt();
- char [] tmp = new char[salt_length];
- dis.readFully(tmp);
- salt = tmp;
- timer.endTime();
-
- return true;
- } else {
- timer.endTime();
-
- return false;
- }
- } catch (SocketTimeoutException e) {
- timer.endTime();
-
- throw new ServerException("getSalt failed", ServerException.TypeInputTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlot failed");
- }
- }
-
- private char[] createIV(int64_t machineId, int64_t localSequenceNumber) {
- ByteBuffer buffer = ByteBuffer.allocate(IV_SIZE);
- buffer.putLong(machineId);
- int64_t localSequenceNumberShifted = localSequenceNumber << 16;
- buffer.putLong(localSequenceNumberShifted);
- return buffer.array();
-
- }
-
- private char[] encryptSlotAndPrependIV(char[] rawData, char[] ivBytes) {
- try {
- IvParameterSpec ivSpec = new IvParameterSpec(ivBytes);
- Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
- cipher.init(Cipher.ENCRYPT_MODE, key, ivSpec);
-
- char[] encryptedBytes = cipher.doFinal(rawData);
-
- char[] chars = new char[encryptedBytes.length + IV_SIZE];
- System.arraycopy(ivBytes, 0, chars, 0, ivBytes.length);
- System.arraycopy(encryptedBytes, 0, chars, IV_SIZE, encryptedBytes.length);
-
- return chars;
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed To Encrypt");
- }
- }
-
-
- private char[] stripIVAndDecryptSlot(char[] rawData) {
- try {
- char[] ivBytes = new char[IV_SIZE];
- char[] encryptedBytes = new char[rawData.length - IV_SIZE];
- System.arraycopy(rawData, 0, ivBytes, 0, IV_SIZE);
- System.arraycopy(rawData, IV_SIZE, encryptedBytes, 0 , encryptedBytes.length);
-
- IvParameterSpec ivSpec = new IvParameterSpec(ivBytes);
-
- Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
- cipher.init(Cipher.DECRYPT_MODE, key, ivSpec);
- return cipher.doFinal(encryptedBytes);
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Failed To Decrypt");
- }
- }
-
+ void initSecurity();
/*
* API for putting a slot into the queue. Returns NULL on success.
* On failure, the server will send slots with newer sequence
* numbers.
*/
- public Slot[] putSlot(Slot slot, int max) throws ServerException {
- URL url = NULL;
- URLConnection con = NULL;
- HttpURLConnection http = NULL;
-
- try {
- if (salt == NULL) {
- if (!getSalt()) {
- throw new ServerException("putSlot failed", ServerException.TypeSalt);
- }
- initCrypt();
- }
-
- int64_t sequencenumber = slot.getSequenceNumber();
- char[] slotBytes = slot.encode(mac);
- // slotBytes = encryptCipher.doFinal(slotBytes);
-
- // char[] iVBytes = slot.getSlotCryptIV();
-
- // char[] chars = new char[slotBytes.length + IV_SIZE];
- // System.arraycopy(iVBytes, 0, chars, 0, iVBytes.length);
- // System.arraycopy(slotBytes, 0, chars, IV_SIZE, slotBytes.length);
-
-
- char[] chars = encryptSlotAndPrependIV(slotBytes, slot.getSlotCryptIV());
-
- url = buildRequest(true, sequencenumber, max);
-
- timer.startTime();
- con = url.openConnection();
- http = (HttpURLConnection) con;
-
- http.setRequestMethod("POST");
- http.setFixedLengthStreamingMode(chars.length);
- http.setDoOutput(true);
- http.setConnectTimeout(TIMEOUT_MILLIS);
- http.setReadTimeout(TIMEOUT_MILLIS);
- http.connect();
-
- OutputStream os = http.getOutputStream();
- os.write(chars);
- os.flush();
-
- timer.endTime();
-
-
- // System.out.println("Bytes Sent: " + chars.length);
- } catch (ServerException e) {
- timer.endTime();
-
- throw e;
- } catch (SocketTimeoutException e) {
- timer.endTime();
-
- throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("putSlot failed");
- }
-
-
-
- try {
- timer.startTime();
- InputStream is = http.getInputStream();
- DataInputStream dis = new DataInputStream(is);
- char[] resptype = new char[7];
- dis.readFully(resptype);
- timer.endTime();
-
- if (Arrays.equals(resptype, "getslot".getBytes())) {
- return processSlots(dis);
- } else if (Arrays.equals(resptype, "putslot".getBytes())) {
- return NULL;
- } else
- throw new Error("Bad response to putslot");
-
- } catch (SocketTimeoutException e) {
- timer.endTime();
- throw new ServerException("putSlot failed", ServerException.TypeInputTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("putSlot failed");
- }
- }
+ Array<Slot *> *putSlot(Slot slot, int max);
/**
* Request the server to send all slots with the given
* sequencenumber or newer.
*/
- public Slot[] getSlots(int64_t sequencenumber) throws ServerException {
- URL url = NULL;
- URLConnection con = NULL;
- HttpURLConnection http = NULL;
-
- try {
- if (salt == NULL) {
- if (!getSalt()) {
- throw new ServerException("getSlots failed", ServerException.TypeSalt);
- }
- initCrypt();
- }
-
- url = buildRequest(false, sequencenumber, 0);
- timer.startTime();
- con = url.openConnection();
- http = (HttpURLConnection) con;
- http.setRequestMethod("POST");
- http.setConnectTimeout(TIMEOUT_MILLIS);
- http.setReadTimeout(TIMEOUT_MILLIS);
-
-
-
- http.connect();
- timer.endTime();
-
- } catch (SocketTimeoutException e) {
- timer.endTime();
-
- throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout);
- } catch (ServerException e) {
- timer.endTime();
-
- throw e;
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlots failed");
- }
+ Array<Slot *> *getSlots(int64_t sequencenumber);
- try {
-
- timer.startTime();
- InputStream is = http.getInputStream();
- DataInputStream dis = new DataInputStream(is);
- char[] resptype = new char[7];
-
- dis.readFully(resptype);
- timer.endTime();
-
- if (!Arrays.equals(resptype, "getslot".getBytes()))
- throw new Error("Bad Response: " + new String(resptype));
-
- return processSlots(dis);
- } catch (SocketTimeoutException e) {
- timer.endTime();
-
- throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
- } catch (Exception e) {
- // e.printStackTrace();
- throw new Error("getSlots failed");
- }
- }
/**
* Method that actually handles building Slot objects from the
* server response. Shared by both putSlot and getSlots.
*/
- private Slot[] processSlots(DataInputStream dis) throws Exception {
- int numberofslots = dis.readInt();
- int[] sizesofslots = new int[numberofslots];
-
- Slot[] slots = new Slot[numberofslots];
- for (int i = 0; i < numberofslots; i++)
- sizesofslots[i] = dis.readInt();
-
- for (int i = 0; i < numberofslots; i++) {
-
- char[] rawData = new char[sizesofslots[i]];
- dis.readFully(rawData);
-
-
- // char[] data = new char[rawData.length - IV_SIZE];
- // System.arraycopy(rawData, IV_SIZE, data, 0, data.length);
-
-
- char[] data = stripIVAndDecryptSlot(rawData);
-
- // data = decryptCipher.doFinal(data);
-
- slots[i] = Slot.decode(table, data, mac);
- }
- dis.close();
- return slots;
- }
-
- public char[] sendLocalData(char[] sendData, int64_t localSequenceNumber, String host, int port) {
-
- if (salt == NULL) {
- return NULL;
- }
- try {
- System.out.println("Passing Locally");
-
- mac.update(sendData);
- char[] genmac = mac.doFinal();
- char[] totalData = new char[sendData.length + genmac.length];
- System.arraycopy(sendData, 0, totalData, 0, sendData.length);
- System.arraycopy(genmac, 0, totalData, sendData.length, genmac.length);
-
- // Encrypt the data for sending
- // char[] encryptedData = encryptCipher.doFinal(totalData);
- // char[] encryptedData = encryptCipher.doFinal(totalData);
- char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber());
- char[] encryptedData = encryptSlotAndPrependIV(totalData, iv);
-
- // Open a TCP socket connection to a local device
- Socket socket = new Socket(host, port);
- socket.setReuseAddress(true);
- DataOutputStream output = new DataOutputStream(socket.getOutputStream());
- DataInputStream input = new DataInputStream(socket.getInputStream());
-
-
- timer.startTime();
- // Send data to output (length of data, the data)
- output.writeInt(encryptedData.length);
- output.write(encryptedData, 0, encryptedData.length);
- output.flush();
-
- int lengthOfReturnData = input.readInt();
- char[] returnData = new char[lengthOfReturnData];
- input.readFully(returnData);
-
- timer.endTime();
-
- // returnData = decryptCipher.doFinal(returnData);
- returnData = stripIVAndDecryptSlot(returnData);
- // returnData = decryptCipher.doFinal(returnData);
-
- // We are done with this socket
- socket.close();
-
- mac.update(returnData, 0, returnData.length - HMAC_SIZE);
- char[] realmac = mac.doFinal();
- char[] recmac = new char[HMAC_SIZE];
- System.arraycopy(returnData, returnData.length - realmac.length, recmac, 0, realmac.length);
-
- if (!Arrays.equals(recmac, realmac))
- throw new Error("Local Error: Invalid HMAC! Potential Attack!");
-
- char[] returnData2 = new char[lengthOfReturnData - recmac.length];
- System.arraycopy(returnData, 0, returnData2, 0, returnData2.length);
-
- return returnData2;
- } catch (Exception e) {
- e.printStackTrace();
- // throw new Error("Local comms failure...");
-
- }
-
- return NULL;
- }
-
- private void localServerWorkerFunction() {
-
- ServerSocket inputSocket = NULL;
-
- try {
- // Local server socket
- inputSocket = new ServerSocket(listeningPort);
- inputSocket.setReuseAddress(true);
- inputSocket.setSoTimeout(TIMEOUT_MILLIS);
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Local server setup failure...");
- }
-
- while (!doEnd) {
-
- try {
- // Accept incoming socket
- Socket socket = inputSocket.accept();
-
- DataInputStream input = new DataInputStream(socket.getInputStream());
- DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-
- // Get the encrypted data from the server
- int dataSize = input.readInt();
- char[] readData = new char[dataSize];
- input.readFully(readData);
-
- timer.endTime();
-
- // Decrypt the data
- // readData = decryptCipher.doFinal(readData);
- readData = stripIVAndDecryptSlot(readData);
-
- mac.update(readData, 0, readData.length - HMAC_SIZE);
- char[] genmac = mac.doFinal();
- char[] recmac = new char[HMAC_SIZE];
- System.arraycopy(readData, readData.length - recmac.length, recmac, 0, recmac.length);
-
- if (!Arrays.equals(recmac, genmac))
- throw new Error("Local Error: Invalid HMAC! Potential Attack!");
-
- char[] returnData = new char[readData.length - recmac.length];
- System.arraycopy(readData, 0, returnData, 0, returnData.length);
-
- // Process the data
- // char[] sendData = table.acceptDataFromLocal(readData);
- char[] sendData = table.acceptDataFromLocal(returnData);
-
-
- mac.update(sendData);
- char[] realmac = mac.doFinal();
- char[] totalData = new char[sendData.length + realmac.length];
- System.arraycopy(sendData, 0, totalData, 0, sendData.length);
- System.arraycopy(realmac, 0, totalData, sendData.length, realmac.length);
-
- // Encrypt the data for sending
- // char[] encryptedData = encryptCipher.doFinal(totalData);
- char[] iv = createIV(table.getMachineId(), table.getLocalSequenceNumber());
- char[] encryptedData = encryptSlotAndPrependIV(totalData, iv);
-
-
- timer.startTime();
- // Send data to output (length of data, the data)
- output.writeInt(encryptedData.length);
- output.write(encryptedData, 0, encryptedData.length);
- output.flush();
-
- // close the socket
- socket.close();
- } catch (Exception e) {
-
- }
- }
-
- if (inputSocket != NULL) {
- try {
- inputSocket.close();
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Local server close failure...");
- }
- }
- }
-
- public void close() {
- doEnd = true;
-
- if (localServerThread != NULL) {
- try {
- localServerThread.join();
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Local Server thread join issue...");
- }
- }
-
- // System.out.println("Done Closing Cloud Comm");
- }
- protected void finalize() throws Throwable {
- try {
- close(); // close open files
- } finally {
- super.finalize();
- }
- }
-}
+ Array<char> *sendLocalData(Array<char> *sendData, int64_t localSequenceNumber, String host, int port);
+ public void close();
+};
+#endif
if (isDead) {
// If dead then just kill this part and move on
- newPart.setDead();
+ newPart->setDead();
return;
}
- CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
+ CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
+
if (previoslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
+ previoslySeenPart->setDead();
+ } else if (newPart->isLastPart()) {
missingParts = new HashSet<int32_t>();
hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
+
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
}
}
}
-
+
if (!fldisComplete && hasLastPart) {
-
+
// We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
+ missingParts->remove(newPart->getPartNumber());
+
// Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
+ if (missingParts->size() == 0) {
+
// We have all the parts
fldisComplete = true;
-
+
// Decode all the parts and create the key value guard and update sets
decodeCommitData();
-
+
// Get the sequence number and arbitrator of this transaction
- sequenceNumber = parts.get(0).getSequenceNumber();
- machineId = parts.get(0).getMachineId();
- transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+ sequenceNumber = parts->get(0)->getSequenceNumber();
+ machineId = parts->get(0)->getMachineId();
+ transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
}
}
}
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
+int64_t Commit::getSequenceNumber() {
+ return sequenceNumber;
+}
- Hashtable<int32_t, CommitPart> getParts() {
- return parts;
- }
+int64_t Commit::getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+}
- void addKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
+Hashtable<int32_t, CommitPart *> *Commit::getParts() {
+ return parts;
+}
- void invalidateKey(IoTString key) {
- liveKeys.remove(key);
+void Commit::addKV(KeyValue *kv) {
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+}
- if (liveKeys.size() == 0) {
- setDead();
- }
- }
+void Commit::invalidateKey(IoTString *key) {
+ liveKeys->remove(key);
- Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
+ if (liveKeys->size() == 0) {
+ setDead();
+ }
+}
-int32_t getNumberOfParts() {
- return parts.size();
+Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
}
- void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
+int32_t Commit::getNumberOfParts() {
+ return parts->size();
+}
- // Set dead
- isDead = true;
+void Commit::setDead() {
+ if (isDead) {
+ // Already dead
+ return;
+ }
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts.keySet()) {
- CommitPart part = parts.get(partNumber);
- part.setDead();
- }
- }
+ // Set dead
+ isDead = true;
- CommitPart getPart(int index) {
- return parts.get(index);
- }
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber : parts->keySet()) {
+ CommitPart part = parts->get(partNumber);
+ part->setDead();
+ }
+}
- void createCommitParts() {
+CommitPart *Commit::getPart(int index) {
+ return parts->get(index);
+}
- parts.clear();
+void Commit::createCommitParts() {
+ parts->clear();
- // Convert to chars
- char[] charData = convertDataToBytes();
+ // Convert to chars
+ Array<char> *charData = convertDataToBytes();
- int commitPartCount = 0;
- int currentPosition = 0;
- int remaining = charData.length;
+ int commitPartCount = 0;
+ int currentPosition = 0;
+ int remaining = charData->length();
- while (remaining > 0) {
+ while (remaining > 0) {
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = CommitPart.MAX_NON_HEADER_SIZE;
- if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
+ bool isLastPart = false;
+ // determine how much to copy
+ int copySize = CommitPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true;// last bit of data so last part
+ }
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
+ // Copy to a smaller version
+ Array<char> *partData = new Array<char>(copySize);
+ System->arraycopy(charData, currentPosition, partData, 0, copySize);
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts.put(part.getPartNumber(), part);
+ CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->put(part->getPartNumber(), part);
- // Update position, count and remaining
- currentPosition += copySize;
- commitPartCount++;
- remaining -= copySize;
- }
- }
+ // Update position, count and remaining
+ currentPosition += copySize;
+ commitPartCount++;
+ remaining -= copySize;
+ }
+}
- void decodeCommitData() {
+void Commit::decodeCommitData() {
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts->keySet()->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ dataSize += tp->getDataSize();
+ }
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
+ Array<char> *combinedData = new Array<char>(dataSize);
+ int currentPosition = 0;
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- CommitPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
+ // Stitch all the data sections together
+ for (int i = 0; i < parts->keySet()->size(); i++) {
+ CommitPart *tp = parts->get(i);
+ System->arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+ // Decoder Object
+ ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
- // Decode how many key value pairs need to be decoded
- int numberOfKVUpdates = bbDecode.getInt();
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVUpdates = bbDecode->getInt();
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- liveKeys.add(kv.getKey());
- }
- }
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue *kv = (KeyValue *)KeyValue->decode(bbDecode);
+ keyValueUpdateSet->add(kv);
+ liveKeys->add(kv->getKey());
+ }
+}
- char[] convertDataToBytes() {
+Array<char> *convertDataToBytes() {
- // Calculate the size of the data
- int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue kv : keyValueUpdateSet) {
- sizeOfData += kv.getSize();
- }
+ // Calculate the size of the data
+ int sizeOfData = sizeof(int32_t); // Number of Update KV's
+ for (KeyValue *kv : keyValueUpdateSet) {
+ sizeOfData += kv->getSize();
+ }
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+ // Data handlers and storage
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueUpdateSet.size());
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueUpdateSet->size());
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
+ // Encode all the updates
+ for (KeyValue *kv : keyValueUpdateSet) {
+ kv->encode(bbEncode);
+ }
- return bbEncode.array();
- }
+ return bbEncode->array();
+}
- void setKVsMap(Hashtable<IoTString, KeyValue> newKVs) {
- keyValueUpdateSet.clear();
- liveKeys.clear();
+void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
+ keyValueUpdateSet->clear();
+ liveKeys->clear();
- keyValueUpdateSet.addAll(newKVs.values());
- liveKeys.addAll(newKVs.keySet());
+ keyValueUpdateSet->addAll(newKVs->values());
+ liveKeys->addAll(newKVs->keySet());
- }
+}
- static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
- if (older == NULL) {
- return newer;
- } else if (newer == NULL) {
- return older;
- }
+ if (older == NULL) {
+ return newer;
+ } else if (newer == NULL) {
+ return older;
+ }
- Hashtable<IoTString, KeyValue> kvSet = new Hashtable<IoTString, KeyValue>();
- for (KeyValue kv : older.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+ Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
+ for (KeyValue *kv : older->getKeyValueUpdateSet()) {
+ kvSet->put(kv->getKey(), kv);
+ }
- for (KeyValue kv : newer.getKeyValueUpdateSet()) {
- kvSet.put(kv.getKey(), kv);
- }
+ for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
+ kvSet->put(kv->getKey(), kv);
+ }
- int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
+ int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
- if (transactionSequenceNumber == -1) {
- transactionSequenceNumber = older.getTransactionSequenceNumber();
- }
+ if (transactionSequenceNumber == -1) {
+ transactionSequenceNumber = older->getTransactionSequenceNumber();
+ }
- Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+ Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
- newCommit.setKVsMap(kvSet);
+ newCommit->setKVsMap(kvSet);
- return newCommit;
- }
+ return newCommit;
}
#include "common.h"
class Commit {
- private:
- Hashtable<int32_t, CommitPart *> * parts;
- Hashset<int32_t> *missingParts;
- bool fldisComplete;
+private:
+ Hashtable<int32_t, CommitPart *, int32_t> *parts;
+ Hashset<int32_t, int32_t> *missingParts;
+ bool fldisComplete;
bool hasLastPart;
- Hashset<KeyValue *> *keyValueUpdateSet;
- bool isDead;
- int64_t sequenceNumber;
+ Hashset<KeyValue *> *keyValueUpdateSet;
+ bool isDead;
+ int64_t sequenceNumber;
int64_t machineId;
- int64_t transactionSequenceNumber;
- Hashset<IoTString*> * liveKeys;
- Array<char> * convertDataToBytes();
- void setKVsMap(Hashtable<IoTString *, KeyValue *> * newKVs);
-
- public:
+ int64_t transactionSequenceNumber;
+ Hashset<IoTString *> *liveKeys;
+ Array<char> *convertDataToBytes();
+ void setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs);
+
+public:
Commit();
Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber);
-
- void addPartDecode(CommitPart * newPart);
+
+ void addPartDecode(CommitPart *newPart);
int64_t getSequenceNumber();
int64_t getTransactionSequenceNumber();
- Hashtable<int32_t, CommitPart *> *getParts();
- void addKV(KeyValue * kv);
- void invalidateKey(IoTString * key);
- Hashset<KeyValue *> * getKeyValueUpdateSet();
+ Hashtable<int32_t, CommitPart *, int32_t> *getParts();
+ void addKV(KeyValue *kv);
+ void invalidateKey(IoTString *key);
+ Hashset<KeyValue *> *getKeyValueUpdateSet();
int32_t getNumberOfParts();
int64_t getMachineId() { return machineId; }
bool isComplete() { return fldisComplete; }
bool isLive() { return !isDead; }
void setDead();
- CommitPart * getPart(int32_t index);
+ CommitPart *getPart(int32_t index);
void createCommitParts();
void decodeCommitData();
};
-Commit * Commit_merge(Commit * newer, Commit * older, int64_t newSequenceNumber);
+Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber);
#endif
-class CommitPart extends Entry{
-
- // Max size of the part excluding the fixed size header
- static final int MAX_NON_HEADER_SIZE = 512;
-
-
- // Sequence number of the transaction this commit is for, -1 if not a cloud transaction
- int64_t machineId = -1; // Machine Id of the device that made the commit
- int64_t sequenceNumber = -1; // commit sequence number for this arbitrator
- int64_t transactionSequenceNumber = -1;
- int partNumber = -1; // Parts position in the
- Boolean isLastPart = false;
- char[] data = NULL;
-
- Pair<int64_t int32_t> partId = NULL;
- Pair<int64_t, int64_t> commitId = NULL;
-
-
- CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) {
- super(s);
- machineId = _machineId;
- sequenceNumber = _sequenceNumber;
- transactionSequenceNumber = _transactionSequenceNumber;
- partNumber = _partNumber;
- isLastPart = _isLastPart;
- data = _data;
-
- partId = new Pair<int64_t int32_t>(sequenceNumber, partNumber);
- commitId = new Pair<int64_t, int64_t>(machineId, sequenceNumber);
- }
-
- int getSize() {
- if (data == NULL) {
- return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
- }
- return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
- }
-
- void setSlot(Slot s) {
- parentslot = s;
- }
-
- int getPartNumber() {
- return partNumber;
- }
-
- int getDataSize() {
- return data.length;
- }
-
- char[] getData() {
- return data;
- }
-
- Pair<int64_t int32_t> getPartId() {
- return partId;
- }
-
- Pair<int64_t, int64_t> getCommitId() {
- return commitId;
- }
-
- Boolean isLastPart() {
- return isLastPart;
- }
-
- int64_t getMachineId() {
- return machineId;
- }
-
- int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- static Entry decode(Slot s, ByteBuffer bb) {
- int64_t machineId = bb->getLong();
- int64_t sequenceNumber = bb->getLong();
- int64_t transactionSequenceNumber = bb->getLong();
- int partNumber = bb->getInt();
- int dataSize = bb->getInt();
- Boolean isLastPart = bb->get() == 1;
-
- // Get the data
- char[] data = new char[dataSize];
- bb->get(data);
-
- return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
- }
-
- void encode(ByteBuffer bb) {
- bb->put(Entry.TypeCommitPart);
- bb->putLong(machineId);
- bb->putLong(sequenceNumber);
- bb->putLong(transactionSequenceNumber);
- bb->putInt(partNumber);
- bb->putInt(data.length);
-
- if (isLastPart) {
- bb->put((char)1);
- } else {
- bb->put((char)0);
- }
-
- bb->put(data);
- }
-
- char getType() {
- return Entry.TypeCommitPart;
- }
-
- Entry getCopy(Slot s) {
- return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
- }
+class CommitPart extends Entry {
+
+ // Max size of the part excluding the fixed size header
+ static final int MAX_NON_HEADER_SIZE = 512;
+
+
+ // Sequence number of the transaction this commit is for, -1 if not a cloud transaction
+ int64_t machineId = -1; // Machine Id of the device that made the commit
+ int64_t sequenceNumber = -1; // commit sequence number for this arbitrator
+ int64_t transactionSequenceNumber = -1;
+ int partNumber = -1; // Parts position in the
+ bool isLastPart = false;
+ char[] data = NULL;
+
+ Pair<int64_t int32_t> partId = NULL;
+ Pair<int64_t, int64_t> commitId = NULL;
+
+
+ CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) {
+ super(s);
+ machineId = _machineId;
+ sequenceNumber = _sequenceNumber;
+ transactionSequenceNumber = _transactionSequenceNumber;
+ partNumber = _partNumber;
+ isLastPart = _isLastPart;
+ data = _data;
+
+ partId = new Pair<int64_t int32_t>(sequenceNumber, partNumber);
+ commitId = new Pair<int64_t, int64_t>(machineId, sequenceNumber);
+ }
+
+ int getSize() {
+ if (data == NULL) {
+ return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
+ }
+ return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
+ }
+
+ void setSlot(Slot s) {
+ parentslot = s;
+ }
+
+ int getPartNumber() {
+ return partNumber;
+ }
+
+ int getDataSize() {
+ return data.length;
+ }
+
+ char[] getData() {
+ return data;
+ }
+
+ Pair<int64_t int32_t> getPartId() {
+ return partId;
+ }
+
+ Pair<int64_t, int64_t> getCommitId() {
+ return commitId;
+ }
+
+ bool isLastPart() {
+ return isLastPart;
+ }
+
+ int64_t getMachineId() {
+ return machineId;
+ }
+
+ int64_t getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+ }
+
+ int64_t getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ static Entry decode(Slot s, ByteBuffer bb) {
+ int64_t machineId = bb->getLong();
+ int64_t sequenceNumber = bb->getLong();
+ int64_t transactionSequenceNumber = bb->getLong();
+ int partNumber = bb->getInt();
+ int dataSize = bb->getInt();
+ bool isLastPart = bb->get() == 1;
+
+ // Get the data
+ char[] data = new char[dataSize];
+ bb->get(data);
+
+ return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
+ }
+
+ void encode(ByteBuffer bb) {
+ bb->put(Entry.TypeCommitPart);
+ bb->putLong(machineId);
+ bb->putLong(sequenceNumber);
+ bb->putLong(transactionSequenceNumber);
+ bb->putInt(partNumber);
+ bb->putInt(data.length);
+
+ if (isLastPart) {
+ bb->put((char)1);
+ } else {
+ bb->put((char)0);
+ }
+
+ bb->put(data);
+ }
+
+ char getType() {
+ return Entry.TypeCommitPart;
+ }
+
+ Entry getCopy(Slot s) {
+ return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
+ }
}
-class CommitPart extends Entry{
-
- // Max size of the part excluding the fixed size header
- public static final int MAX_NON_HEADER_SIZE = 512;
-
-
- // Sequence number of the transaction this commit is for, -1 if not a cloud transaction
- private int64_t machineId = -1; // Machine Id of the device that made the commit
- private int64_t sequenceNumber = -1; // commit sequence number for this arbitrator
- private int64_t transactionSequenceNumber = -1;
- private int partNumber = -1; // Parts position in the
- private Boolean isLastPart = false;
- private char[] data = NULL;
-
- private Pair<int64_t int32_t> partId = NULL;
- private Pair<int64_t, int64_t> commitId = NULL;
-
-
- public CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) {
- super(s);
- machineId = _machineId;
- sequenceNumber = _sequenceNumber;
- transactionSequenceNumber = _transactionSequenceNumber;
- partNumber = _partNumber;
- isLastPart = _isLastPart;
- data = _data;
-
- partId = new Pair<int64_t int32_t>(sequenceNumber, partNumber);
- commitId = new Pair<int64_t, int64_t>(machineId, sequenceNumber);
- }
-
- public int getSize() {
- if (data == NULL) {
- return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
- }
- return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
- }
-
- public void setSlot(Slot s) {
- parentslot = s;
- }
-
- public int getPartNumber() {
- return partNumber;
- }
-
- public int getDataSize() {
- return data.length;
- }
-
- public char[] getData() {
- return data;
- }
-
- public Pair<int64_t int32_t> getPartId() {
- return partId;
- }
-
- public Pair<int64_t, int64_t> getCommitId() {
- return commitId;
- }
-
- public Boolean isLastPart() {
- return isLastPart;
- }
-
- public int64_t getMachineId() {
- return machineId;
- }
-
- public int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- public int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- static Entry decode(Slot s, ByteBuffer bb) {
- int64_t machineId = bb->getLong();
- int64_t sequenceNumber = bb->getLong();
- int64_t transactionSequenceNumber = bb->getLong();
- int partNumber = bb->getInt();
- int dataSize = bb->getInt();
- Boolean isLastPart = bb->get() == 1;
-
- // Get the data
- char[] data = new char[dataSize];
- bb->get(data);
-
- return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
- }
-
- public void encode(ByteBuffer bb) {
- bb->put(Entry.TypeCommitPart);
- bb->putLong(machineId);
- bb->putLong(sequenceNumber);
- bb->putLong(transactionSequenceNumber);
- bb->putInt(partNumber);
- bb->putInt(data.length);
-
- if (isLastPart) {
- bb->put((char)1);
- } else {
- bb->put((char)0);
- }
-
- bb->put(data);
- }
-
- public char getType() {
- return Entry.TypeCommitPart;
- }
-
- public Entry getCopy(Slot s) {
- return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
- }
+class CommitPart extends Entry {
+
+ // Max size of the part excluding the fixed size header
+ public static final int MAX_NON_HEADER_SIZE = 512;
+
+
+ // Sequence number of the transaction this commit is for, -1 if not a cloud transaction
+ private int64_t machineId = -1; // Machine Id of the device that made the commit
+ private int64_t sequenceNumber = -1; // commit sequence number for this arbitrator
+ private int64_t transactionSequenceNumber = -1;
+ private int partNumber = -1; // Parts position in the
+ private bool isLastPart = false;
+ private char[] data = NULL;
+
+ private Pair<int64_t int32_t> partId = NULL;
+ private Pair<int64_t, int64_t> commitId = NULL;
+
+
+ public CommitPart(Slot s, int64_t _machineId, int64_t _sequenceNumber, int64_t _transactionSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) {
+ super(s);
+ machineId = _machineId;
+ sequenceNumber = _sequenceNumber;
+ transactionSequenceNumber = _transactionSequenceNumber;
+ partNumber = _partNumber;
+ isLastPart = _isLastPart;
+ data = _data;
+
+ partId = new Pair<int64_t int32_t>(sequenceNumber, partNumber);
+ commitId = new Pair<int64_t, int64_t>(machineId, sequenceNumber);
+ }
+
+ public int getSize() {
+ if (data == NULL) {
+ return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
+ }
+ return (3 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
+ }
+
+ public void setSlot(Slot s) {
+ parentslot = s;
+ }
+
+ public int getPartNumber() {
+ return partNumber;
+ }
+
+ public int getDataSize() {
+ return data.length;
+ }
+
+ public char[] getData() {
+ return data;
+ }
+
+ public Pair<int64_t int32_t> getPartId() {
+ return partId;
+ }
+
+ public Pair<int64_t, int64_t> getCommitId() {
+ return commitId;
+ }
+
+ public bool isLastPart() {
+ return isLastPart;
+ }
+
+ public int64_t getMachineId() {
+ return machineId;
+ }
+
+ public int64_t getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+ }
+
+ public int64_t getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ static Entry decode(Slot s, ByteBuffer bb) {
+ int64_t machineId = bb->getLong();
+ int64_t sequenceNumber = bb->getLong();
+ int64_t transactionSequenceNumber = bb->getLong();
+ int partNumber = bb->getInt();
+ int dataSize = bb->getInt();
+ bool isLastPart = bb->get() == 1;
+
+ // Get the data
+ char[] data = new char[dataSize];
+ bb->get(data);
+
+ return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb->put(Entry.TypeCommitPart);
+ bb->putLong(machineId);
+ bb->putLong(sequenceNumber);
+ bb->putLong(transactionSequenceNumber);
+ bb->putInt(partNumber);
+ bb->putInt(data.length);
+
+ if (isLastPart) {
+ bb->put((char)1);
+ } else {
+ bb->put((char)0);
+ }
+
+ bb->put(data);
+ }
+
+ public char getType() {
+ return Entry.TypeCommitPart;
+ }
+
+ public Entry getCopy(Slot s) {
+ return new CommitPart(s, machineId, sequenceNumber, transactionSequenceNumber, partNumber, data, isLastPart);
+ }
}
* @version 1.0
*/
-Entry * Entry_decode(Slot * slot, ByteBuffer * bb) {
- char type = bb->get();
- switch (type) {
-
- case TypeCommitPart:
- return CommitPart_decode(slot, bb);
-
- case TypeAbort:
- return Abort_decode(slot, bb);
-
- case TypeTransactionPart:
- return TransactionPart_decode(slot, bb);
-
- case TypeNewKey:
- return NewKey_decode(slot, bb);
-
- case TypeLastMessage:
- return LastMessage_decode(slot, bb);
-
- case TypeRejectedMessage:
- return RejectedMessage_decode(slot, bb);
-
- case TypeTableStatus:
- return TableStatus_decode(slot, bb);
-
- default:
- ASSERT(0);
- }
+Entry *Entry_decode(Slot *slot, ByteBuffer *bb) {
+ char type = bb->get();
+ switch (type) {
+
+ case TypeCommitPart:
+ return CommitPart_decode(slot, bb);
+
+ case TypeAbort:
+ return Abort_decode(slot, bb);
+
+ case TypeTransactionPart:
+ return TransactionPart_decode(slot, bb);
+
+ case TypeNewKey:
+ return NewKey_decode(slot, bb);
+
+ case TypeLastMessage:
+ return LastMessage_decode(slot, bb);
+
+ case TypeRejectedMessage:
+ return RejectedMessage_decode(slot, bb);
+
+ case TypeTableStatus:
+ return TableStatus_decode(slot, bb);
+
+ default:
+ ASSERT(0);
+ }
}
void Entry::setDead() {
- if (!islive ) {
- return; // already dead
- }
-
- islive = false;
-
- if (parentslot != NULL) {
- parentslot->decrementLiveCount();
- }
+ if (!islive ) {
+ return; // already dead
+ }
+
+ islive = false;
+
+ if (parentslot != NULL) {
+ parentslot->decrementLiveCount();
+ }
}
class Entry : public Liveness {
/* Records whether the information is still live or has been
- superceded by a newer update. */
- private:
+ superceded by a newer update. */
+private:
bool islive;
- protected:
- Slot * parentslot;
+protected:
+ Slot *parentslot;
- public:
- Entry(Slot * _parentslot) : islive(true), parentslot(_parentslot) {}
+public:
+ Entry(Slot *_parentslot) : islive(true), parentslot(_parentslot) {}
/**
* Returns true if the Entry object is still live.
/**
* Serializes the Entry object into the char buffer.
*/
- virtual void encode(ByteBuffer * bb) = 0;
+ virtual void encode(ByteBuffer *bb) = 0;
/**
* Returns the size in chars the entry object will take in the char
* array.
*/
- virtual int getSize() = 0;
+ virtual int getSize() = 0;
/**
* Returns a char encoding the type of the entry object.
*/
- virtual char getType() = 0;
+ virtual char getType() = 0;
/**
* Returns a copy of the Entry that can be added to a different slot.
*/
- virtual Entry * getCopy(Slot * s) = 0;
+ virtual Entry *getCopy(Slot *s) = 0;
};
/**
* Static method for decoding char array into Entry objects. First
* char tells the type of entry.
*/
-Entry * Entry_decode(Slot * slot, ByteBuffer * bb);
+Entry *Entry_decode(Slot *slot, ByteBuffer *bb);
#endif
*/
public class IoTString {
- private:
- Array<char> array;
-
- IoTString() {}
+private:
+ Array<char> *array;
+ IoTString() {}
/**
* Builds an IoTString object around the char array. This
* constructor makes a copy, so the caller is free to modify the char array.
*/
-
- public:
- IoTString(Array<char> * _array) { array.init(_array); }
- ~IoTString() {}
-
+
+public:
+ IoTString(Array<char> *_array) : array(new Array<char>(_array)) {}
+ ~IoTString() {}
+
/**
* Internal method to grab a reference to our char array. Caller
* must not modify it.
*/
-
- Array<char> * internalBytes() { return &array; }
-
+
+ Array<char> *internalBytes() { return array; }
+
/**
* Returns a copy of the underlying char string.
*/
-
- Array<char> * getBytes() { return new Array<Char>(&array); }
+
+ Array<char> *getBytes() { return new Array<Char>(&array); }
/**
* Returns the length in chars of the IoTString.
*/
-
+
int length() { return array->length(); }
+ friend IoTString *IoTString_shallow(Array<char> *_array);
+};
+
+IoTString *IoTString_shallow(Array<char> *_array) {
+ IoTString *str = new IoTString();
+ str->array = _array;
+ return str;
}
#endif
-
+#include "KeyValue.h"
/**
* KeyValue entry for Slot.
* @author Brian Demsky <bdemsky@uci.edu>
* @version 1.0
*/
-class KeyValue { /*extends Entry */
- IoTString key;
- IoTString value;
-
- KeyValue(IoTString _key, IoTString _value) {
- key = _key;
- value = _value;
- }
+KeyValue *KeyValue_decode(ByteBuffer *bb) {
+ int keylength = bb->getInt();
+ int valuelength = bb->getInt();
+ Array<char> *key = new Array<char> *(keylength);
+ bb->get(key);
- IoTString getKey() {
- return key;
+ if (valuelength != 0) {
+ Array<char> *value = new Array<char>(valuelength);
+ bb->get(value);
+ return new KeyValue(IoTString_shallow(key), IoTString_shallow(value));
}
- IoTString getValue() {
- return value;
- }
-
- static KeyValue decode(ByteBuffer bb) {
- int keylength = bb->getInt();
- int valuelength = bb->getInt();
- char[] key = new char[keylength];
- bb->get(key);
+ return new KeyValue(IoTString_shallow(key), NULL);
+}
- if (valuelength != 0) {
- char[] value = new char[valuelength];
- bb->get(value);
- return new KeyValue(IoTString.shallow(key), IoTString.shallow(value));
- }
+void KeyValue::encode(ByteBuffer *bb) {
+ bb->putInt(key->length());
- return new KeyValue(IoTString.shallow(key), NULL);
+ if (value != NULL) {
+ bb->putInt(value->length());
+ } else {
+ bb->putInt(0);
}
- void encode(ByteBuffer bb) {
- bb->putInt(key.length());
-
- if (value != NULL) {
- bb->putInt(value.length());
- } else {
- bb->putInt(0);
- }
+ bb->put(key->internalBytes());
- bb->put(key.internalBytes());
-
- if (value != NULL) {
- bb->put(value.internalBytes());
- }
+ if (value != NULL) {
+ bb->put(value->internalBytes());
}
+}
- int getSize() {
- if (value != NULL) {
- return 2 * sizeof(int32_t) + key.length() + value.length();
- }
-
- return 2 * sizeof(int32_t) + key.length();
+int KeyValue::getSize() {
+ if (value != NULL) {
+ return 2 * sizeof(int32_t) + key->length() + value->length();
}
- String toString() {
- if (value == NULL) {
- return "NULL";
- }
- return value.toString();
- }
+ return 2 * sizeof(int32_t) + key.length();
+}
- KeyValue getCopy() {
- return new KeyValue(key, value);
- }
+KeyValue *KeyValue::getCopy() {
+ return new KeyValue(key, value);
}
+#ifndef KEYVALUE_H
+#define KEYVALUE_H
+#include "common.h"
/**
* KeyValue entry for Slot.
* @version 1.0
*/
-class KeyValue { /*extends Entry */
- private IoTString key;
- private IoTString value;
+class KeyValue {/*extends Entry */
+private:
+ IoTString *key;
+ IoTString *value;
- public KeyValue(IoTString _key, IoTString _value) {
- key = _key;
- value = _value;
+public:
+ KeyValue(IoTString *_key, IoTString *_value) :
+ key(_key),
+ value(_value) {
}
- public IoTString getKey() {
- return key;
- }
-
- public IoTString getValue() {
- return value;
- }
-
- static KeyValue decode(ByteBuffer bb) {
- int keylength = bb->getInt();
- int valuelength = bb->getInt();
- char[] key = new char[keylength];
- bb->get(key);
-
- if (valuelength != 0) {
- char[] value = new char[valuelength];
- bb->get(value);
- return new KeyValue(IoTString.shallow(key), IoTString.shallow(value));
- }
-
- return new KeyValue(IoTString.shallow(key), NULL);
- }
-
- public void encode(ByteBuffer bb) {
- bb->putInt(key.length());
-
- if (value != NULL) {
- bb->putInt(value.length());
- } else {
- bb->putInt(0);
- }
+ IoTString *getKey() { return key; }
+ IoTString *getValue() { return value; }
+ void encode(ByteBuffer *bb);
+ int32_t getSize();
+ KeyValue *getCopy();
+};
- bb->put(key.internalBytes());
-
- if (value != NULL) {
- bb->put(value.internalBytes());
- }
- }
-
- public int getSize() {
- if (value != NULL) {
- return 2 * sizeof(int32_t) + key.length() + value.length();
- }
-
- return 2 * sizeof(int32_t) + key.length();
- }
-
- public String toString() {
- if (value == NULL) {
- return "NULL";
- }
- return value.toString();
- }
-
- public KeyValue getCopy() {
- return new KeyValue(key, value);
- }
-}
+KeyValue *KeyValue_decode(ByteBuffer *bb);
+#endif
* @version 1.0
*/
-Entry * LastMessage_decode(Slot * slot, ByteBuffer * bb) {
- int64_t machineid=bb->getLong();
- int64_t seqnum=bb->getLong();
+Entry *LastMessage_decode(Slot *slot, ByteBuffer *bb) {
+ int64_t machineid = bb->getLong();
+ int64_t seqnum = bb->getLong();
return new LastMessage(slot, machineid, seqnum);
}
-void LastMessage::encode(ByteBuffer * bb) {
+void LastMessage::encode(ByteBuffer *bb) {
bb->put(TypeLastMessage);
bb->putLong(machineid);
bb->putLong(seqnum);
#ifndef LASTMESSAGE_H
#define LASTMESSAGE_H
-#include"common.h"
-#include"Entry.h"
+#include "common.h"
+#include "Entry.h"
/**
* This Entry records the last message sent by a given machine.
class LastMessage : public Entry {
- private:
+private:
int64_t machineid;
int64_t seqnum;
- public:
- LastMessage(Slot * slot, int64_t _machineid, int64_t _seqnum) :
- Entry(slot),
+public:
+ LastMessage(Slot *slot, int64_t _machineid, int64_t _seqnum) :
+ Entry(slot),
machineid(_machineid),
seqnum(_seqnum) {
- }
+ }
int64_t getMachineID() { return machineid; }
int64_t getSequenceNumber() { return seqnum; }
- void encode(ByteBuffer * bb);
- int getSize() { return 2*sizeof(int64_t)+sizeof(char); }
+ void encode(ByteBuffer *bb);
+ int getSize() { return 2 * sizeof(int64_t) + sizeof(char); }
char getType() { return TypeLastMessage; }
- Entry * getCopy(Slot * s) { return new LastMessage(s, machineid, seqnum); }
+ Entry *getCopy(Slot *s) { return new LastMessage(s, machineid, seqnum); }
};
-Entry * LastMessage_decode(Slot * slot, ByteBuffer * bb);
+Entry *LastMessage_decode(Slot *slot, ByteBuffer *bb);
#endif
class LocalComm {
- Table t1;
- Table t2;
+ Table t1;
+ Table t2;
- LocalComm(Table _t1, Table _t2) {
- t1 = _t1;
- t2 = _t2;
- }
+ LocalComm(Table _t1, Table _t2) {
+ t1 = _t1;
+ t2 = _t2;
+ }
- char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException{
- System.out.println("Passing Locally");
+ char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException {
+ System.out.println("Passing Locally");
- if (deviceId == t1.getMachineId()) {
- // return t1.localCommInput(data);
- } else if (deviceId == t2.getMachineId()) {
- // return t2.localCommInput(data);
- } else {
- throw new Error("Cannot send to " + deviceId + " using this local comm");
- }
+ if (deviceId == t1.getMachineId()) {
+ // return t1.localCommInput(data);
+ } else if (deviceId == t2.getMachineId()) {
+ // return t2.localCommInput(data);
+ } else {
+ throw new Error("Cannot send to " + deviceId + " using this local comm");
+ }
- return new char[0];
- }
+ return new char[0];
+ }
}
class LocalComm {
- private Table t1;
- private Table t2;
+ private Table t1;
+ private Table t2;
- public LocalComm(Table _t1, Table _t2) {
- t1 = _t1;
- t2 = _t2;
- }
+ public LocalComm(Table _t1, Table _t2) {
+ t1 = _t1;
+ t2 = _t2;
+ }
- public char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException{
- System.out.println("Passing Locally");
+ public char[] sendDataToLocalDevice(Long deviceId, char[] data) throws InterruptedException {
+ System.out.println("Passing Locally");
- if (deviceId == t1.getMachineId()) {
- // return t1.localCommInput(data);
- } else if (deviceId == t2.getMachineId()) {
- // return t2.localCommInput(data);
- } else {
- throw new Error("Cannot send to " + deviceId + " using this local comm");
- }
+ if (deviceId == t1.getMachineId()) {
+ // return t1.localCommInput(data);
+ } else if (deviceId == t2.getMachineId()) {
+ // return t2.localCommInput(data);
+ } else {
+ throw new Error("Cannot send to " + deviceId + " using this local comm");
+ }
- return new char[0];
- }
+ return new char[0];
+ }
}
ctags -R
tabbing:
- uncrustify -c C.cfg --no-backup *.cc */*.cc */*/*.cc
- uncrustify -c C.cfg --no-backup *.h */*.h */*/*.h
+ uncrustify -c C.cfg --no-backup *.cc
+ uncrustify -c C.cfg --no-backup *.h
wc:
- wc */*.cc */*.h *.cc *.h */*/*.cc */*/*.h
+ wc *.cc *.h
.PHONY: $(PHONY)
#include "NewKey.h"
#include "ByteBuffer.h"
-Entry * decode(Slot * slot, ByteBuffer * bb) {
+Entry *decode(Slot *slot, ByteBuffer *bb) {
int keylength = bb->getInt();
- Array<char> * key = new Array<char>(keylength);
+ Array<char> *key = new Array<char>(keylength);
bb->get(key);
int64_t machineid = bb->getLong();
-
+
return new NewKey(slot, IoTString.shallow(key), machineid);
}
-void NewKey::encode(ByteBuffer * bb) {
+void NewKey::encode(ByteBuffer *bb) {
bb->put(TypeNewKey);
bb->putInt(key->length());
bb->put(key->internalBytes());
class NewKey : public Entry {
- private:
- IoTString * key;
+private:
+ IoTString *key;
int64_t machineid;
- public:
- NewKey(Slot * slot, IoTString * _key, int64_t _machineid) :
- Entry(slot),
+public:
+ NewKey(Slot *slot, IoTString *_key, int64_t _machineid) :
+ Entry(slot),
key(_key),
machineid(_machineid) {
- }
-
+ }
+
int64_t getMachineID() { return machineid; }
- IoTString * getKey() { return key; }
- void setSlot(Slot * s) { parentslot = s; }
-
+ IoTString *getKey() { return key; }
+ void setSlot(Slot *s) { parentslot = s; }
+
- void encode(ByteBuffer * bb);
+ void encode(ByteBuffer *bb);
int getSize();
char getType() { return TypeNewKey; }
-
- Entry * getCopy(Slot * s) { return new NewKey(s, key, machineid); }
+
+ Entry *getCopy(Slot *s) { return new NewKey(s, key, machineid); }
};
-Entry * NewKey_decode(Slot *slot, ByteBuffer *bb);
+Entry *NewKey_decode(Slot *slot, ByteBuffer *bb);
template<typename A, typename B>
class Pair {
- private:
- A a;
- B b;
-
- public:
+private:
+ A a;
+ B b;
+
+public:
Pair(A _a, B _b) :
- a(_a),
- b(_b) {
+ a(_a),
+ b(_b) {
}
-
+
A getFirst() {
return a;
}
-
+
B getSecond() {
return b;
}
#include "PendingTransaction.h"
PendingTransaction::PendingTransaction(int64_t _machineId) :
- keyValueUpdateSet(new Hashset<KeyValue*>()),
- keyValueGuardSet(new HashSet<KeyValue*>()),
+ keyValueUpdateSet(new Hashset<KeyValue *>()),
+ keyValueGuardSet(new HashSet<KeyValue *>()),
arbitrator(-1),
clientLocalSequenceNumber(-1),
machineId(_machineId),
* Add a new key value to the updates
*
*/
-void PendingTransaction::addKV(KeyValue * newKV) {
+void PendingTransaction::addKV(KeyValue *newKV) {
KeyValue rmKV = NULL;
// Make sure there are no duplicates
for (KeyValue kv : keyValueUpdateSet) {
if (kv.getKey().equals(newKV.getKey())) {
-
+
// Remove key if we are adding a newer version of the same key
rmKV = kv;
break;
}
}
-
+
// Remove key if we are adding a newer version of the same key
if (rmKV != NULL) {
keyValueUpdateSet.remove(rmKV);
currentDataSize -= rmKV.getSize();
}
-
+
// Add the key to the hash set
keyValueUpdateSet.add(newKV);
currentDataSize += newKV.getSize();
* Add a new key value to the guard set
*
*/
-void PendingTransaction::addKVGuard(KeyValue * newKV) {
+void PendingTransaction::addKVGuard(KeyValue *newKV) {
// Add the key to the hash set
keyValueGuardSet.add(newKV);
currentDataSize += newKV.getSize();
arbitrator = arb;
return true;
}
-
+
return arb == arbitrator;
}
bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> keyValTableCommitted, Hashtable<IoTString *, KeyValue *> keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> keyValTablePendingTransSpeculative) {
for (KeyValue kvGuard : keyValueGuardSet) {
-
+
// First check if the key is in the speculative table, this is the value of the latest assumption
KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey());
-
-
+
+
if (kv == NULL) {
// if it is not in the pending trans table then check the speculative table and use that
// value as our latest assumption
kv = keyValTableSpeculative.get(kvGuard.getKey());
}
-
-
+
+
if (kv == NULL) {
// if it is not in the speculative table then check the committed table and use that
// value as our latest assumption
kv = keyValTableCommitted.get(kvGuard.getKey());
}
-
+
if (kvGuard.getValue() != NULL) {
if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
return false;
return false;
}
}
- }
+ }
return true;
}
-Transaction * PendingTransaction::createTransaction() {
- Transaction * newTransaction = new Transaction();
+Transaction *PendingTransaction::createTransaction() {
+ Transaction *newTransaction = new Transaction();
int transactionPartCount = 0;
-
+
// Convert all the data into a char array so we can start partitioning
- Array<char> * charData = convertDataToBytes();
-
+ Array<char> *charData = convertDataToBytes();
+
int currentPosition = 0;
int remaining = charData.length;
-
+
while (remaining > 0) {
-
- Boolean isLastPart = false;
+
+ bool isLastPart = false;
// determine how much to copy
int copySize = TransactionPart.MAX_NON_HEADER_SIZE;
if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) {
copySize = remaining;
- isLastPart = true; // last bit of data so last part
+ isLastPart = true;// last bit of data so last part
}
-
+
// Copy to a smaller version
char[] partData = new char[copySize];
System.arraycopy(charData, currentPosition, partData, 0, copySize);
-
+
TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
newTransaction.addPartEncode(part);
-
- // Update position, count and remaining
+
+ // Update position, count and remaining
currentPosition += copySize;
transactionPartCount++;
remaining -= copySize;
}
-
+
// Add the Guard Conditions
for (KeyValue kv : keyValueGuardSet) {
newTransaction.addGuardKV(kv);
}
-
+
// Add the updates
for (KeyValue kv : keyValueUpdateSet) {
newTransaction.addUpdateKV(kv);
}
-
+
return newTransaction;
}
-Arrar<char> * PendingTransaction::convertDataToBytes() {
+Arrar<char> *PendingTransaction::convertDataToBytes() {
// Calculate the size of the data
- int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
+ int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
sizeOfData += currentDataSize;
-
+
// Data handlers and storage
- Array<char> * dataArray = new Array<char>(sizeOfData);
- ByteBuffer * bbEncode = ByteBuffer_wrap(dataArray);
-
+ Array<char> *dataArray = new Array<char>(sizeOfData);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
+
// Encode the size of the updates and guard sets
bbEncode->putInt(keyValueGuardSet.size());
bbEncode->putInt(keyValueUpdateSet.size());
-
+
// Encode all the guard conditions
for (KeyValue kv : keyValueGuardSet) {
kv->encode(bbEncode);
#include "common.h"
class PendingTransaction {
- private:
- Hashset<KeyValue*> * keyValueUpdateSet = NULL;
- Hashset<KeyValue*> * keyValueGuardSet = NULL;
+private:
+ Hashset<KeyValue *> *keyValueUpdateSet = NULL;
+ Hashset<KeyValue *> *keyValueGuardSet = NULL;
int64_t arbitrator = -1;
int64_t clientLocalSequenceNumber = -1;
- int64_t machineId = -1;
+ int64_t machineId = -1;
int32_T currentDataSize = 0;
- public:
+public:
PendingTransaction(int64_t _machineId);
/**
* Add a new key value to the updates
/**
* Get the key value update set
*/
- Hashset<KeyValue*> * getKVUpdates() { return keyValueUpdateSet; }
+ Hashset<KeyValue *> *getKVUpdates() { return keyValueUpdateSet; }
/**
* Get the key value update set
*/
- public Hashset<KeyValue *> * getKVGuard() { return keyValueGuardSet; }
+ public Hashset<KeyValue *> *getKVGuard() { return keyValueGuardSet; }
void setClientLocalSequenceNumber(int64_t _clientLocalSequenceNumber) { clientLocalSequenceNumber = _clientLocalSequenceNumber; }
bool evaluateGuard(Hashtable<IoTString *, KeyValue *> keyValTableCommitted, Hashtable<IoTString *, KeyValue *> keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> keyValTablePendingTransSpeculative);
- Transaction * createTransaction();
+ Transaction *createTransaction();
- Array<char> * convertDataToBytes();
+ Array<char> *convertDataToBytes();
};
#endif
* @version 1.0
*/
-Entry * RejectedMessage_decode(Slot * slot, ByteBuffer * bb) {
- int64_t sequencenum=bb->getLong();
- int64_t machineid=bb->getLong();
- int64_t oldseqnum=bb->getLong();
- int64_t newseqnum=bb->getLong();
- char equalto=bb->get();
- return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto==1);
+Entry *RejectedMessage_decode(Slot *slot, ByteBuffer *bb) {
+ int64_t sequencenum = bb->getLong();
+ int64_t machineid = bb->getLong();
+ int64_t oldseqnum = bb->getLong();
+ int64_t newseqnum = bb->getLong();
+ char equalto = bb->get();
+ return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto == 1);
}
void RejectedMessage::removeWatcher(int64_t machineid) {
setDead();
}
-void RejectedMessage::encode(ByteBuffer * bb) {
+void RejectedMessage::encode(ByteBuffer *bb) {
bb->put(TypeRejectedMessage);
bb->putLong(sequencenum);
bb->putLong(machineid);
bb->putLong(oldseqnum);
bb->putLong(newseqnum);
- bb->put(equalto?(char)1:(char)0);
+ bb->put(equalto ? (char)1 : (char)0);
}
#include "Entry.h"
class RejectedMessage : public Entry {
- private:
+private:
/* Sequence number */
int64_t sequencenum;
/* Machine identifier */
* equal to) the specified machine identifier. */
bool equalto;
/* Set of machines that have not received notification. */
- Hashset<int64_t> * watchset;
-
- RejectedMessage(Slot * slot, int64_t _sequencenum, int64_t _machineid, int64_t _oldseqnum, int64_t _newseqnum, bool _equalto) : Entry(slot),
+ Hashset<int64_t> *watchset;
+
+ RejectedMessage(Slot *slot, int64_t _sequencenum, int64_t _machineid, int64_t _oldseqnum, int64_t _newseqnum, bool _equalto) : Entry(slot),
sequencenum(_sequencenum),
machineid(_machineid),
oldseqnum(_oldseqnum),
newseqnum(_newseqnum),
equalto(_equalto) {
- }
+ }
int64_t getOldSeqNum() { return oldseqnum; }
int64_t getNewSeqNum() { return newseqnum; }
bool getEqual() { return equalto; }
int64_t getMachineID() { return machineid; }
- int64_t getSequenceNumber() { return sequencenum; }
- void setWatchSet(HashSet<int64_t> * _watchset) { watchset=_watchset; }
+ int64_t getSequenceNumber() { return sequencenum; }
+ void setWatchSet(HashSet<int64_t> *_watchset) { watchset = _watchset; }
void removeWatcher(int64_t machineid);
- void encode(ByteBuffer * bb);
- int getSize() { return 4*sizeof(int64_t) + 2*sizeof(char); }
+ void encode(ByteBuffer *bb);
+ int getSize() { return 4 * sizeof(int64_t) + 2 * sizeof(char); }
char getType() { return TypeRejectedMessage; }
- Entry * getCopy(Slot * s) {
+ Entry *getCopy(Slot *s) {
return new RejectedMessage(s,sequencenum, machineid, oldseqnum, newseqnum, equalto);
}
};
-Entry * RejectedMessage_decode(Slot * slot, ByteBuffer * bb);
+Entry *RejectedMessage_decode(Slot *slot, ByteBuffer *bb);
#ifndef
#include "Slot.h"
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, char* _hmac, int64_t _localSequenceNumber) {
- seqnum = _seqnum;
- machineid = _machineid;
- prevhmac = _prevhmac;
- hmac = _hmac;
- entries = new Vector<Entry*>();
- livecount = 1;
- seqnumlive = true;
- freespace = SLOT_SIZE - getBaseSize();
- table = _table;
- localSequenceNumber = _localSequenceNumber;
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber) {
+ seqnum = _seqnum;
+ machineid = _machineid;
+ prevhmac = _prevhmac;
+ hmac = _hmac;
+ entries = new Vector<Entry *>();
+ livecount = 1;
+ seqnumlive = true;
+ freespace = SLOT_SIZE - getBaseSize();
+ table = _table;
+ localSequenceNumber = _localSequenceNumber;
}
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, int64_t _localSequenceNumber) {
- this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber);
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber) {
+ this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber);
}
Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) {
- this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber);
+ this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber);
}
-Entry * Slot::addEntry(Entry * e) {
- e = e->getCopy(this);
- entries->add(e);
- livecount++;
- freespace -= e->getSize();
- return e;
+Entry *Slot::addEntry(Entry *e) {
+ e = e->getCopy(this);
+ entries->add(e);
+ livecount++;
+ freespace -= e->getSize();
+ return e;
}
void Slot::removeEntry(Entry *e) {
- entries->remove(e);
- livecount--;
- freespace += e->getSize();
+ entries->remove(e);
+ livecount--;
+ freespace += e->getSize();
}
void Slot::addShallowEntry(Entry *e) {
- entries->add(e);
- livecount++;
- freespace -= e->getSize();
+ entries->add(e);
+ livecount++;
+ freespace -= e->getSize();
}
/**
* using its reserved space. */
bool Slot::hasSpace(Entry *e) {
- int newfreespace = freespace - e->getSize();
- return newfreespace >= 0;
+ int newfreespace = freespace - e->getSize();
+ return newfreespace >= 0;
}
-Vector<Entry*> * Slot::getEntries() {
- return entries;
+Vector<Entry *> *Slot::getEntries() {
+ return entries;
}
-Slot * Slotdecode(Table * table, char* array, Mac * mac) {
- mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
- char* realmac = mac->doFinal();
-
- ByteBuffer * bb = ByteBuffer_wrap(array);
- char* hmac = new char[HMAC_SIZE];
- char* prevhmac = new char[HMAC_SIZE];
- bb->get(hmac);
- bb->get(prevhmac);
- if (!Arrays.equals(realmac, hmac))
- throw new Error("Server Error: Invalid HMAC! Potential Attack!");
-
- int64_t seqnum = bb->getLong();
- int64_t machineid = bb->getLong();
- int numentries = bb->getInt();
- Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1);
-
- for (int i = 0; i < numentries; i++) {
- slot->addShallowEntry(Entry->decode(slot, bb));
- }
-
- return slot;
+Slot *Slotdecode(Table *table, char *array, Mac *mac) {
+ mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
+ char *realmac = mac->doFinal();
+
+ ByteBuffer *bb = ByteBuffer_wrap(array);
+ char *hmac = new char[HMAC_SIZE];
+ char *prevhmac = new char[HMAC_SIZE];
+ bb->get(hmac);
+ bb->get(prevhmac);
+ if (!Arrays.equals(realmac, hmac))
+ throw new Error("Server Error: Invalid HMAC! Potential Attack!");
+
+ int64_t seqnum = bb->getLong();
+ int64_t machineid = bb->getLong();
+ int numentries = bb->getInt();
+ Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1);
+
+ for (int i = 0; i < numentries; i++) {
+ slot->addShallowEntry(Entry->decode(slot, bb));
+ }
+
+ return slot;
}
-char* Slot::encode(Mac mac) {
- char* array = new char[SLOT_SIZE];
- ByteBuffer * bb = ByteBuffer_wrap(array);
- /* Leave space for the slot HMAC. */
- bb->position(HMAC_SIZE);
- bb->put(prevhmac);
- bb->putLong(seqnum);
- bb->putLong(machineid);
- bb->putInt(entries->size());
- for (Entry entry : entries) {
- entry->encode(bb);
- }
- /* Compute our HMAC */
- mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
- char* realmac = mac->doFinal();
- hmac = realmac;
- bb->position(0);
- bb->put(realmac);
- return array;
+char *Slot::encode(Mac mac) {
+ char *array = new char[SLOT_SIZE];
+ ByteBuffer *bb = ByteBuffer_wrap(array);
+ /* Leave space for the slot HMAC. */
+ bb->position(HMAC_SIZE);
+ bb->put(prevhmac);
+ bb->putLong(seqnum);
+ bb->putLong(machineid);
+ bb->putInt(entries->size());
+ for (Entry entry : entries) {
+ entry->encode(bb);
+ }
+ /* Compute our HMAC */
+ mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
+ char *realmac = mac->doFinal();
+ hmac = realmac;
+ bb->position(0);
+ bb->put(realmac);
+ return array;
}
* itself.
*/
-Vector<Entry*> *Slot::getLiveEntries(bool resize) {
- Vector<Entry*> *liveEntries = new Vector<Entry*>();
- for (Entry *entry : entries) {
- if (entry->isLive()) {
- if (!resize || entry->getType() != Entry->TypeTableStatus)
- liveEntries->add(entry);
- }
- }
-
- if (seqnumlive && !resize)
- liveEntries->add(new LastMessage(this, machineid, seqnum));
-
- return liveEntries;
+Vector<Entry *> *Slot::getLiveEntries(bool resize) {
+ Vector<Entry *> *liveEntries = new Vector<Entry *>();
+ for (Entry *entry : entries) {
+ if (entry->isLive()) {
+ if (!resize || entry->getType() != Entry->TypeTableStatus)
+ liveEntries->add(entry);
+ }
+ }
+
+ if (seqnumlive && !resize)
+ liveEntries->add(new LastMessage(this, machineid, seqnum));
+
+ return liveEntries;
}
*/
void Slot::setDead() {
- seqnumlive = false;
- decrementLiveCount();
+ seqnumlive = false;
+ decrementLiveCount();
}
/**
*/
void Slot::decrementLiveCount() {
- livecount--;
- if (livecount == 0) {
- table->decrementLiveCount();
- }
+ livecount--;
+ if (livecount == 0) {
+ table->decrementLiveCount();
+ }
}
-char* Slot::getSlotCryptIV() {
- ByteBuffer * buffer = ByteBuffer_allocate(CloudComm.IV_SIZE);
- buffer->putLong(machineid);
- int64_t localSequenceNumberShift = localSequenceNumber << 16;
- buffer->putLong(localSequenceNumberShift);
- return buffer->array();
+char *Slot::getSlotCryptIV() {
+ ByteBuffer *buffer = ByteBuffer_allocate(CloudComm.IV_SIZE);
+ buffer->putLong(machineid);
+ int64_t localSequenceNumberShift = localSequenceNumber << 16;
+ buffer->putLong(localSequenceNumberShift);
+ return buffer->array();
}
#define HMAC_SIZE 32
class Slot : public Liveness {
- private:
+private:
/** Sequence number of the slot. */
int64_t seqnum;
/** HMAC of previous slot. */
- char* prevhmac;
+ char *prevhmac;
/** HMAC of this slot. */
- char* hmac;
+ char *hmac;
/** Machine that sent this slot. */
- int64_t machineid;
+ int64_t machineid;
/** Vector of entries in this slot. */
- Vector<Entry *> * entries;
+ Vector<Entry *> *entries;
/** Pieces of information that are live. */
int livecount;
/** Flag that indicates whether this slot is still live for
* recording the machine that sent it. */
- bool seqnumlive;
+ bool seqnumlive;
/** Number of chars of free space. */
int freespace;
/** Reference to Table */
- Table * table;
+ Table *table;
int64_t localSequenceNumber;
- void addShallowEntry(Entry * e);
-
- public:
- Slot(Table * _table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, char* _hmac, int64_t _localSequenceNumber);
- Slot(Table _table, int64_t _seqnum, int64_t _machineid, char* _prevhmac, int64_t _localSequenceNumber);
+ void addShallowEntry(Entry *e);
+
+public:
+ Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber);
+ Slot(Table _table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber);
Slot(Table _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber);
- char* getHMAC() { return hmac; }
- char* getPrevHMAC() { return prevhmac; }
- Entry * addEntry(Entry * e);
- void removeEntry(Entry * e);
- bool hasSpace(Entry * e);
- Vector<Entry *> * getEntries();
- char* encode(Mac * mac);
+ char *getHMAC() { return hmac; }
+ char *getPrevHMAC() { return prevhmac; }
+ Entry *addEntry(Entry *e);
+ void removeEntry(Entry *e);
+ bool hasSpace(Entry *e);
+ Vector<Entry *> *getEntries();
+ char *encode(Mac *mac);
int getBaseSize() { return 2 * HMAC_SIZE + 2 * sizeof(int64_t) + sizeof(int); }
- Vector<Entry *> * getLiveEntries(bool resize);
+ Vector<Entry *> *getLiveEntries(bool resize);
int64_t getSequenceNumber() { return seqnum; }
int64_t getMachineID() { return machineid; }
void setDead();
void decrementLiveCount();
bool isLive() { return livecount > 0; }
- char* getSlotCryptIV();
+ char *getSlotCryptIV();
};
-Slot * Slotdecode(Table * table, char* array, Mac *mac);
+Slot *Slotdecode(Table *table, char *array, Mac *mac);
#endif
-#include"SlotBuffer.h"
+#include "SlotBuffer.h"
/**
* Circular buffer that holds the live set of slots.
* @author Brian Demsky
void SlotBuffer::resize(int newsize) {
if (newsize == (array->length() - 1))
return;
-
- Array<Slot *> * newarray = new Array<Slot *>(newsize + 1);
+
+ Array<Slot *> *newarray = new Array<Slot *>(newsize + 1);
int currsize = size();
int index = tail;
for (int i = 0; i < currsize; i++) {
tail = 0;
}
-void SlotBuffer::putSlot(Slot * s) {
+void SlotBuffer::putSlot(Slot *s) {
int64_t checkNum = (getNewestSeqNum() + 1);
-
+
if (checkNum != s->getSequenceNumber()) {
// We have a gap so expunge all our slots
oldestseqn = s->getSequenceNumber();
array->set(0, s);
return;
}
-
+
array->set(head, s);
incrementHead();
-
+
if (oldestseqn == 0) {
oldestseqn = s->getSequenceNumber();
}
#ifndef SLOTBUFFER_H
#define SLOTBUFFER_H
-#include"common.h"
+#include "common.h"
/**
* Circular buffer that holds the live set of slots.
#define SlotBuffer_DEFAULT_SIZE 16
class SlotBuffer {
- private:
- Array<Slot *> * array;
+private:
+ Array<Slot *> *array;
int32_t head;
int32_t tail;
void incrementHead();
void incrementTail();
-
- public:
+
+public:
int64_t oldestseqn;
SlotBuffer();
int32_t capacity();
void resize(int newsize);
void putSlot(Slot *s);
- Slot * getSlot(int64_t seqnum);
+ Slot *getSlot(int64_t seqnum);
int64_t getOldestSeqNum() { return oldestseqn; }
int64_t getNewestSeqNum() { return oldestseqn + size() - 1;}
};
* @version 1.0
*/
-SlotIndexer::SlotIndexer(Array<Slot*> * _updates, SlotBuffer * _buffer) :
+SlotIndexer::SlotIndexer(Array<Slot *> *_updates, SlotBuffer *_buffer) :
buffer(_buffer),
updates(_updates),
firstslotseqnum(updates->get(0)->getSequenceNumber()) {
}
-Slot * SlotIndexer::getSlot(int64_t seqnum) {
+Slot *SlotIndexer::getSlot(int64_t seqnum) {
if (seqnum >= firstslotseqnum) {
int32_t offset = (int32_t) (seqnum - firstslotseqnum);
if (offset >= updates->length())
*/
class SlotIndexer {
- private:
- Array<Slot*> * updates;
- SlotBuffer * buffer;
+private:
+ Array<Slot *> *updates;
+ SlotBuffer *buffer;
int64_t firstslotseqnum;
- public:
- SlotIndexer(Array<Slot*> * _updates, SlotBuffer * _buffer);
- Slot* getSlot(int64_t seqnum);
+public:
+ SlotIndexer(Array<Slot *> *_updates, SlotBuffer *_buffer);
+ Slot *getSlot(int64_t seqnum);
};
#endif;
final class Table {
/* Constants */
- static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
+ static final int FREE_SLOTS = 2;// Number of slots that should be kept free // 10
static final int SKIP_THRESHOLD = 10;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
CloudComm cloud = NULL;
Random random = NULL;
TableStatus liveTableStatus = NULL;
- PendingTransaction pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
+ PendingTransaction pendingTransactionBuilder = NULL;// Pending Transaction used in building a Pending Transaction
+ Transaction lastPendingTransactionSpeculatedOn = NULL;// Last transaction that was speculated on from the pending transaction
+ Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0; // Number of currently live slots
+ int numberOfSlots = 0; // Number of slots stored in buffer
+ int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount = 0;// Number of currently live slots
int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
+ int64_t localMachineId = 0; // Machine ID of this client device
+ int64_t sequenceNumber = 0; // Largest sequence number a client has received
int64_t localSequenceNumber = 0;
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
+ int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
int64_t localArbitrationSequenceNumber = 0;
bool hadPartialSendToServer = false;
bool attemptedToSendToServer = false;
Slot lastSlotAttemptedToSend = NULL;
bool lastIsNewKey = false;
int lastNewSize = 0;
- Hashtable<Transaction, Vector<int32_t>> lastTransactionPartsSent = NULL;
+ Hashtable<Transaction, Vector<int32_t> > lastTransactionPartsSent = NULL;
Vector<Entry> lastPendingSendArbitrationEntriesToDelete = NULL;
NewKey lastNewKey = NULL;
/* Data Structures */
- Hashtable<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
- Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString, NewKey> liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<int64_t Pair<int64_t Liveness>> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<int64_t HashSet<RejectedMessage>> rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString, Long> arbitratorTable = NULL; // Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
- Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<int64_t Hashtable<int64_t Commit>> liveCommitsTable = NULL;
+ Hashtable<IoTString, KeyValue> committedKeyValueTable = NULL; // Table of committed key value pairs
+ Hashtable<IoTString, KeyValue> speculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = NULL;// Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString, NewKey> liveNewKeyTable = NULL;// Table of live new keys
+ Hashtable<int64_t Pair<int64_t Liveness> > lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t HashSet<RejectedMessage> > rejectedMessageWatchVectorTable = NULL;// Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString, Long> arbitratorTable = NULL;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t>, Abort> liveAbortTable = NULL; // Table live abort messages
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> > newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> > newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> lastArbitratedTransactionNumberByArbitratorTable = NULL;// Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t Transaction> liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t>, Transaction> liveTransactionByTransactionIdTable = NULL;// live transaction grouped by the transaction ID
+ Hashtable<int64_t Hashtable<int64_t Commit> > liveCommitsTable = NULL;
Hashtable<IoTString, Commit> liveCommitsByKeyTable = NULL;
Hashtable<int64_t, int64_t> lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Long> rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
Vector<Transaction> pendingTransactionQueue = NULL;
Vector<ArbitrationRound> pendingSendArbitrationRounds = NULL;
Vector<Entry> pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction, Vector<int32_t>> transactionPartsSent = NULL;
+ Hashtable<Transaction, Vector<int32_t> > transactionPartsSent = NULL;
Hashtable<int64_t TransactionStatus> outstandingTransactionStatus = NULL;
Hashtable<int64_t Abort> liveAbortsGeneratedByLocal = NULL;
- Set<Pair<int64_t, int64_t>> offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<int64_t Pair<String, int32_t>> localCommunicationTable = NULL;
+ Set<Pair<int64_t, int64_t> > offlineTransactionsCommittedAndAtServer = NULL;
+ Hashtable<int64_t Pair<String, int32_t> > localCommunicationTable = NULL;
Hashtable<int64_t, int64_t> lastTransactionSeenFromMachineFromServer = NULL;
Hashtable<int64_t, int64_t> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
speculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
liveNewKeyTable = new Hashtable<IoTString, NewKey>();
- lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness>>();
- rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage>>();
+ lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
arbitratorTable = new Hashtable<IoTString, Long>();
liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
- newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart>>();
- newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart>>();
+ newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
+ newCommitParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, CommitPart> >();
lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
liveTransactionBySequenceNumberTable = new Hashtable<int64_t Transaction>();
liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction>();
- liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit>>();
+ liveCommitsTable = new Hashtable<int64_t Hashtable<int64_t Commit> >();
liveCommitsByKeyTable = new Hashtable<IoTString, Commit>();
lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
rejectedSlotVector = new Vector<Long>();
pendingTransactionQueue = new Vector<Transaction>();
pendingSendArbitrationEntriesToDelete = new Vector<Entry>();
- transactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>();
+ transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
- offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t>>();
- localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t>>();
+ offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+ localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
}
// String toString() {
- // String retString = " Committed Table: \n";
- // retString += "---------------------------\n";
- // retString += commitedTable.toString();
+ // String retString = " Committed Table: \n";
+ // retString += "---------------------------\n";
+ // retString += commitedTable.toString();
- // retString += "\n\n";
+ // retString += "\n\n";
- // retString += " Speculative Table: \n";
- // retString += "---------------------------\n";
- // retString += speculativeTable.toString();
+ // retString += " Speculative Table: \n";
+ // retString += "---------------------------\n";
+ // retString += speculativeTable.toString();
- // return retString;
+ // return retString;
// }
synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) {
continue;
}
- Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
+ Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
if (sendReturn.getFirst()) {
// Failed to contact over local
Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
if (newSlots.length == 0) {
fromRetry = true;
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+ ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
if (sendSlotsReturn.getFirst()) {
if (newKey != NULL) {
localSequenceNumber++;
// Try to fill the slot with data
- ThreeTuple<Boolean, int32_t, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
+ ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
bool needsResize = fillSlotsReturn.getFirst();
int newSize = fillSlotsReturn.getSecond();
- Boolean insertedNewKey = fillSlotsReturn.getThird();
+ bool insertedNewKey = fillSlotsReturn.getThird();
if (needsResize) {
// Reset which transaction to send
lastInsertedNewKey = insertedNewKey;
lastNewSize = newSize;
lastNewKey = newKey;
- lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >(transactionPartsSent);
lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+ ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
if (sendSlotsReturn.getFirst()) {
} else {
// if (!sendSlotsReturn.getSecond()) {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
- // }
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
+ // }
// } else {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
+ // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+ // transaction.resetServerFailure();
- // // Update which transactions parts still need to be sent
- // transaction.removeSentParts(transactionPartsSent.get(transaction));
+ // // Update which transactions parts still need to be sent
+ // transaction.removeSentParts(transactionPartsSent.get(transaction));
- // // Add the transaction status to the outstanding list
- // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+ // // Add the transaction status to the outstanding list
+ // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
- // // Update the transaction status
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+ // // Update the transaction status
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
- // // Check if all the transaction parts were successfully sent and if so then remove it from pending
- // if (transaction.didSendAllParts()) {
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- // pendingTransactionQueue.remove(transaction);
+ // // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ // if (transaction.didSendAllParts()) {
+ // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+ // pendingTransactionQueue.remove(transaction);
- // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
- // }
- // }
- // }
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
+ // }
+ // }
+ // }
// }
// Reset which transaction to send
// if (!fromRetry) {
- // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
- // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
+ // lastTransactionPartsSent = new Hashtable<Transaction, Vector<int32_t>>(transactionPartsSent);
+ // lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
// }
// Nothing was able to be sent to the server so just clear these data structures
// Get the size of the send data
int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
- Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
+ Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) {
lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
}
return true;
}
- Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
+ Pair<bool, bool> sendTransactionToLocal(Transaction transaction) {
// Get the devices local communications
Pair<String, int32_t> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
if (localCommunicationInformation == NULL) {
// Cant talk to that device locally so do nothing
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
// Get the size of the send data
sendDataSize += part.getSize();
}
- Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1;
+ Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) {
lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
}
if (returnData == NULL) {
// Could not contact server
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
// Decode the data
}
}
- return new Pair<Boolean, Boolean>(false, true);
+ return new Pair<bool, bool>(false, true);
}
synchronized char[] acceptDataFromLocal(char[] data) {
}
// Arbitrate on transaction and pull relevant return data
- Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
couldArbitrate = localArbitrateReturn.getFirst();
didCommit = localArbitrateReturn.getSecond();
// Number of arbitration entries to decode
returnDataSize += 2 * sizeof(int32_t);
- // Boolean of did commit or not
+ // bool of did commit or not
if (numberOfParts != 0) {
returnDataSize += sizeof(char);
}
return returnData;
}
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
+ ThreeTuple<bool, bool, Slot[]> sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
bool attemptedToSendToServerTmp = attemptedToSendToServer;
attemptedToSendToServer = true;
array = new Slot[] {slot};
rejectedSlotVector.clear();
inserted = true;
- } else {
+ } else {
if (array.length == 0) {
throw new Error("Server Error: Did not send any slots");
}
}
}
- return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
+ return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
}
/**
* Returns false if a resize was needed
*/
- ThreeTuple<Boolean, int32_t, Boolean> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
+ ThreeTuple<bool, int32_t, bool> fillSlot(Slot slot, bool resize, NewKey newKeyEntry) {
int newSize = 0;
if (liveSlotCount > bufferResizeThreshold) {
- resize = true; //Resize is forced
+ resize = true;//Resize is forced
}
doRejectedMessages(slot);
// Do mandatory rescue of entries
- ThreeTuple<Boolean, Boolean, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+ ThreeTuple<bool, bool, Long> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
// Extract working variables
bool needsResize = mandatoryRescueReturn.getFirst();
if (needsResize && !resize) {
// We need to resize but we are not resizing so return false
- return new ThreeTuple<Boolean, int32_t, Boolean>(true, NULL, NULL);
+ return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
}
bool inserted = false;
// Set the transaction sequence number if it has yet to be inserted into the block chain
// if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
- // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
// }
if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
// Fill the remainder of the slot with rescue data
doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
- return new ThreeTuple<Boolean, int32_t, Boolean>(false, newSize, inserted);
+ return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
}
void doRejectedMessages(Slot s) {
- if (! rejectedSlotVector.isEmpty()) {
+ if (!rejectedSlotVector.isEmpty()) {
/* TODO: We should avoid generating a rejected message entry if
* there is already a sufficient entry in the queue (e.g.,
* equalsto value of true and same sequence number). */
}
}
- ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot slot, bool resize) {
+ ThreeTuple<bool, bool, Long> doMandatoryResuce(Slot slot, bool resize) {
int64_t newestSequenceNumber = buffer.getNewestSeqNum();
int64_t oldestSequenceNumber = buffer.getOldestSeqNum();
if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
Slot previousSlot = buffer.getSlot(currentSequenceNumber);
// Push slot number forward
- if (! seenLiveSlot) {
+ if (!seenLiveSlot) {
oldestLiveSlotSequenceNumver = currentSequenceNumber;
}
slot.addEntry(liveEntry);
} else if (currentSequenceNumber == firstIfFull) {
//if there's no space but the entry is about to fall off the queue
- System.out.println("B"); //?
- return new ThreeTuple<Boolean, Boolean, Long>(true, seenLiveSlot, currentSequenceNumber);
+ System.out.println("B");//?
+ return new ThreeTuple<bool, bool, Long>(true, seenLiveSlot, currentSequenceNumber);
}
}
}
// Did not resize
- return new ThreeTuple<Boolean, Boolean, Long>(false, seenLiveSlot, currentSequenceNumber);
+ return new ThreeTuple<bool, bool, Long>(false, seenLiveSlot, currentSequenceNumber);
}
void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) {
* for SKIP_THRESHOLD consecutive entries*/
int skipcount = 0;
int64_t newestseqnum = buffer.getNewestSeqNum();
- search:
+search:
for (; seqn <= newestseqnum; seqn++) {
Slot prevslot = buffer.getSlot(seqn);
//Push slot number forward
// Create the abort
Abort newAbort = new Abort(NULL,
- transaction.getClientLocalSequenceNumber(),
- transaction.getSequenceNumber(),
- transaction.getMachineId(),
- transaction.getArbitrator(),
- localArbitrationSequenceNumber);
+ transaction.getClientLocalSequenceNumber(),
+ transaction.getSequenceNumber(),
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
localArbitrationSequenceNumber++;
generatedAborts.add(newAbort);
}
}
- Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
+ Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction) {
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
// Most likely this
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
if (transaction.getMachineId() != localMachineId) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
// We've have already seen this from the server
- return new Pair<Boolean, Boolean>(false, false);
+ return new Pair<bool, bool>(false, false);
}
}
}
}
updateLiveStateFromLocal();
- return new Pair<Boolean, Boolean>(true, true);
+ return new Pair<bool, bool>(true, true);
} else {
if (transaction.getMachineId() == localMachineId) {
// Create the abort
Abort newAbort = new Abort(NULL,
- transaction.getClientLocalSequenceNumber(),
- -1,
- transaction.getMachineId(),
- transaction.getArbitrator(),
- localArbitrationSequenceNumber);
+ transaction.getClientLocalSequenceNumber(),
+ -1,
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
localArbitrationSequenceNumber++;
addAbortSet.add(newAbort);
}
updateLiveStateFromLocal();
- return new Pair<Boolean, Boolean>(true, false);
+ return new Pair<bool, bool>(true, false);
}
}
return false;
}
// bool compactArbitrationData() {
- // return false;
+ // return false;
// }
/**
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
}
- commitsToEdit.remove(NULL); // remove NULL since it could be in this set
+ commitsToEdit.remove(NULL); // remove NULL since it could be in this set
// Update each previous commit that needs to be updated
for (Commit previousCommit : commitsToEdit) {
if (startIndex >= transactionSequenceNumbersSorted.size()) {
// Make sure we are not out of bounds
- return false; // did not speculate
+ return false; // did not speculate
}
Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
void updateLiveTransactionsAndStatus() {
// Go through each of the transactions
- for (Iterator<Map.Entry<int64_t Transaction>> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t Transaction> > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) {
Transaction transaction = iter.next().getValue();
// Check if the transaction is dead
}
// Go through each of the transactions
- for (Iterator<Map.Entry<int64_t TransactionStatus>> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<Map.Entry<int64_t TransactionStatus> > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) {
TransactionStatus status = iter.next().getValue();
// Check if the transaction is dead
// Create a list of clients to watch until they see this rejected message entry.
HashSet<Long> deviceWatchSet = new HashSet<Long>();
- for (Map.Entry<int64_t Pair<int64_t Liveness>> lastMessageEntry : lastMessageTable.entrySet()) {
+ for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
// Machine ID for the last message entry
int64_t lastMessageEntryMachineId = lastMessageEntry.getKey();
// Abort has not been seen by the client it is for yet so we need to keep track of it
Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
if (previouslySeenAbort != NULL) {
- previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
+ previouslySeenAbort.setDead();// Delete old version of the abort since we got a rescued newer version
}
if (entry.getTransactionArbitrator() == localMachineId) {
}
// Set dead the abort
- for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
+ for (Iterator<Map.Entry<Pair<int64_t, int64_t>, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) {
Abort abort = i.next().getValue();
if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
Slot currSlot = newSlots[i];
Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
if (prevSlot != NULL &&
- !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
+ !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC()))
throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
}
}
* @version 1.0
*/
- /* Constants */
+/* Constants */
#define Table_FREE_SLOTS 2
// Number of slots that should be kept free // 10
#define Table_SKIP_THRESHOLD 10
#define Table_REJECTED_THRESHOLD 5
class Table {
- private:
+private:
/* Helper Objects */
- SlotBuffer * buffer;
- CloudComm * cloud = NULL;
- Random * random = NULL;
- TableStatus * liveTableStatus = NULL;
- PendingTransaction * pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction * lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction * firstPendingTransaction = NULL; // first transaction in the pending transaction list
-
+ SlotBuffer *buffer;
+ CloudComm *cloud = NULL;
+ Random *random = NULL;
+ TableStatus *liveTableStatus = NULL;
+ PendingTransaction *pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
+ Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
+ Transaction *firstPendingTransaction = NULL; // first transaction in the pending transaction list
+
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0; // Number of currently live slots
+ int numberOfSlots = 0; // Number of slots stored in buffer
+ int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount = 0;// Number of currently live slots
int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
+ int64_t localMachineId = 0; // Machine ID of this client device
+ int64_t sequenceNumber = 0; // Largest sequence number a client has received
int64_t localSequenceNumber = 0;
-
+
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
+ int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
int64_t localArbitrationSequenceNumber = 0;
bool hadPartialSendToServer = false;
bool attemptedToSendToServer = false;
int64_t expectedsize;
bool didFindTableStatus = false;
int64_t currMaxSize = 0;
-
- Slot * lastSlotAttemptedToSend = NULL;
+
+ Slot *lastSlotAttemptedToSend = NULL;
bool lastIsNewKey = false;
int lastNewSize = 0;
Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
- Vector<Entry*> *lastPendingSendArbitrationEntriesToDelete = NULL;
- NewKey * lastNewKey = NULL;
-
-
+ Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete = NULL;
+ NewKey *lastNewKey = NULL;
+
+
/* Data Structures */
- Hashtable<IoTString *, KeyValue*> *committedKeyValueTable = NULL; // Table of committed key value pairs
- Hashtable<IoTString *, KeyValue*> * speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString *, KeyValue *> * pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString *, NewKey *> * liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<int64_t, Pair<int64_t, Liveness*>*> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<int64_t, HashSet<RejectedMessage*>*> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString*, int64_t> *arbitratorTable = NULL; // Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t>*, Abort*> *liveAbortTable = NULL; // Table live abort messages
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>*> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*>*> * newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<int64_t, Transaction*> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t>*, Transaction*> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<int64_t, Hashtable<int64_t, Commit*>> *liveCommitsTable = NULL;
- Hashtable<IoTString*, Commit*> *liveCommitsByKeyTable = NULL;
+ Hashtable<IoTString *, KeyValue *> *committedKeyValueTable = NULL;// Table of committed key value pairs
+ Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString *, NewKey *> *liveNewKeyTable = NULL; // Table of live new keys
+ Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t, HashSet<RejectedMessage *> *> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString *, int64_t> *arbitratorTable = NULL;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable = NULL;// Table live abort messages
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
+ Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable = NULL;
+ Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable = NULL;
Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<int64_t> * rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
- Vector<Transaction*> *pendingTransactionQueue = NULL;
- Vector<ArbitrationRound*> *pendingSendArbitrationRounds = NULL;
- Vector<Entry*> *pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction*, Vector<int32_t*>*> *transactionPartsSent = NULL;
- Hashtable<int64_t, TransactionStatus*> *outstandingTransactionStatus = NULL;
- Hashtable<int64_t, Abort*> *liveAbortsGeneratedByLocal = NULL;
- Hashset<Pair<int64_t, int64_t>*> *offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<int64_t, Pair<String*, int32_t>> * localCommunicationTable = NULL;
- Hashtable<int64_t, int64_t> * lastTransactionSeenFromMachineFromServer = NULL;
- Hashtable<int64_t, int64_t> * lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+ Vector<int64_t> *rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Transaction *> *pendingTransactionQueue = NULL;
+ Vector<ArbitrationRound *> *pendingSendArbitrationRounds = NULL;
+ Vector<Entry *> *pendingSendArbitrationEntriesToDelete = NULL;
+ Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent = NULL;
+ Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus = NULL;
+ Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal = NULL;
+ Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer = NULL;
+ Hashtable<int64_t, Pair<String *, int32_t> > *localCommunicationTable = NULL;
+ Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer = NULL;
+ Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
void init();
- /**
- * Recalculate the new resize threshold
- */
+ /**
+ * Recalculate the new resize threshold
+ */
void setResizeThreshold();
bool sendToServer(NewKey *newKey);
synchronized bool updateFromLocal(int64_t machineId);
- Pair<Boolean, Boolean> sendTransactionToLocal(Transaction *transaction);
- ThreeTuple<Boolean, Boolean, Array<Slot*> *> * sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
+ Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
+ ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
/**
* Returns false if a resize was needed
*/
- ThreeTuple<Boolean, int32_t*, Boolean> * fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+ ThreeTuple<bool, int32_t *, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
void doRejectedMessages(Slot s);
-
- ThreeTuple<Boolean, Boolean, int64_t> doMandatoryResuce(Slot slot, bool resize);
-
- void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
- /**
+
+ ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot slot, bool resize);
+
+ void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize);
+ /**
* Checks for malicious activity and updates the local copy of the block chain.
*/
- void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
-
- void updateLiveStateFromServer();
-
- void updateLiveStateFromLocal();
-
- void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
-
- void updateExpectedSize();
-
-
- /**
- * Check the size of the block chain to make sure there are enough slots sent back by the server.
- * This is only called when we have a gap between the slots that we have locally and the slots
- * sent by the server therefore in the slots sent by the server there will be at least 1 Table
- * status message
- */
- void checkNumSlots(int numberOfSlots);
-
- void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
-
-
- /**
+ void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
+
+ void updateLiveStateFromServer();
+
+ void updateLiveStateFromLocal();
+
+ void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
+
+ void updateExpectedSize();
+
+
+ /**
+ * Check the size of the block chain to make sure there are enough slots sent back by the server.
+ * This is only called when we have a gap between the slots that we have locally and the slots
+ * sent by the server therefore in the slots sent by the server there will be at least 1 Table
+ * status message
+ */
+ void checkNumSlots(int numberOfSlots);
+
+ void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
+
+
+ /**
* Update the size of of the local buffer if it is needed.
*/
- void commitNewMaxSize();
-
- /**
- * Process the new transaction parts from this latest round of slots received from the server
- */
- void processNewTransactionParts();
-
- int64_t lastSeqNumArbOn = 0;
-
- void arbitrateFromServer();
-
- Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction);
-
- /**
- * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
- */
- bool compactArbitrationData();
-
- /**
- * Update all the commits and the committed tables, sets dead the dead transactions
- */
- bool updateCommittedTable();
-
- /**
- * Create the speculative table from transactions that are still live and have come from the cloud
- */
- bool updateSpeculativeTable(bool didProcessNewCommits);
-
- /**
- * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
- */
- void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
-
- /**
- * Set dead and remove from the live transaction tables the transactions that are dead
- */
- void updateLiveTransactionsAndStatus();
-
- /**
- * Process this slot, entry by entry. Also update the latest message sent by slot
- */
- void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
-
- /**
- * Update the last message that was sent for a machine Id
- */
- void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
-
- /**
- * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
- */
- void processEntry(NewKey entry);
-
- /**
- * Process new table status entries and set dead the old ones as new ones come in.
- * keeps track of the largest and smallest table status seen in this current round
- * of updating the local copy of the block chain
- */
- void processEntry(TableStatus entry, int64_t seq);
-
- /**
- * Check old messages to see if there is a block chain violation. Also
- */
- void processEntry(RejectedMessage entry, SlotIndexer indexer);
-
- /**
- * Check if this abort is live, if not then save it so we can kill it later.
- * update the last transaction number that was arbitrated on.
- */
- void processEntry(Abort entry);
-
- /**
- * Set dead the transaction part if that transaction is dead and keep track of all new parts
- */
- void processEntry(TransactionPart entry);
-
- /**
- * Process new commit entries and save them for future use. Delete duplicates
- */
- void processEntry(CommitPart entry);
-
- /**
- * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
- * Updates the live aborts, removes those that are dead and sets them dead.
- * Check that the last message seen is correct and that there is no mismatch of our own last message or that
- * other clients have not had a rollback on the last message.
- */
- void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
-
- /**
- * Add a rejected message entry to the watch set to keep track of which clients have seen that
- * rejected message entry and which have not.
- */
- void addWatchVector(int64_t machineId, RejectedMessage entry);
-
- /**
- * Check if the HMAC chain is not violated
- */
- void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
- bool lastInsertedNewKey = false;
-
- public:
- Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
- Table(CloudComm _cloud, int64_t _localMachineId);
-
- /**
- * Initialize the table by inserting a table status as the first entry into the table status
- * also initialize the crypto stuff.
- */
- void initTable();
-
- /**
- * Rebuild the table from scratch by pulling the latest block chain from the server.
- */
- void rebuild();
- void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
- uint64_t getArbitrator(IoTString * key);
- void close();
- IoTString * getCommitted(IoTString * key);
- IoTString * getSpeculative(IoTString * key);
- IoTString * getCommittedAtomic(IoTString * key);
- bool createNewKey(IoTString * keyName, int64_t machineId);
- TransactionStatus * commitTransaction();
-
+ void commitNewMaxSize();
+
+ /**
+ * Process the new transaction parts from this latest round of slots received from the server
+ */
+ void processNewTransactionParts();
+
+ int64_t lastSeqNumArbOn = 0;
+
+ void arbitrateFromServer();
+
+ Pair<bool, bool> arbitrateOnLocalTransaction(Transaction transaction);
+
+ /**
+ * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
+ */
+ bool compactArbitrationData();
+
+ /**
+ * Update all the commits and the committed tables, sets dead the dead transactions
+ */
+ bool updateCommittedTable();
+
+ /**
+ * Create the speculative table from transactions that are still live and have come from the cloud
+ */
+ bool updateSpeculativeTable(bool didProcessNewCommits);
+
+ /**
+ * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
+ */
+ void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
+
+ /**
+ * Set dead and remove from the live transaction tables the transactions that are dead
+ */
+ void updateLiveTransactionsAndStatus();
+
+ /**
+ * Process this slot, entry by entry. Also update the latest message sent by slot
+ */
+ void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+
+ /**
+ * Update the last message that was sent for a machine Id
+ */
+ void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
+
+ /**
+ * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
+ */
+ void processEntry(NewKey entry);
+
+ /**
+ * Process new table status entries and set dead the old ones as new ones come in.
+ * keeps track of the largest and smallest table status seen in this current round
+ * of updating the local copy of the block chain
+ */
+ void processEntry(TableStatus entry, int64_t seq);
+
+ /**
+ * Check old messages to see if there is a block chain violation. Also
+ */
+ void processEntry(RejectedMessage entry, SlotIndexer indexer);
+
+ /**
+ * Check if this abort is live, if not then save it so we can kill it later.
+ * update the last transaction number that was arbitrated on.
+ */
+ void processEntry(Abort entry);
+
+ /**
+ * Set dead the transaction part if that transaction is dead and keep track of all new parts
+ */
+ void processEntry(TransactionPart entry);
+
+ /**
+ * Process new commit entries and save them for future use. Delete duplicates
+ */
+ void processEntry(CommitPart entry);
+
+ /**
+ * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
+ * Updates the live aborts, removes those that are dead and sets them dead.
+ * Check that the last message seen is correct and that there is no mismatch of our own last message or that
+ * other clients have not had a rollback on the last message.
+ */
+ void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+
+ /**
+ * Add a rejected message entry to the watch set to keep track of which clients have seen that
+ * rejected message entry and which have not.
+ */
+ void addWatchVector(int64_t machineId, RejectedMessage entry);
+
+ /**
+ * Check if the HMAC chain is not violated
+ */
+ void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
+ bool lastInsertedNewKey = false;
+
+public:
+ Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
+ Table(CloudComm _cloud, int64_t _localMachineId);
+
+ /**
+ * Initialize the table by inserting a table status as the first entry into the table status
+ * also initialize the crypto stuff.
+ */
+ void initTable();
+
+ /**
+ * Rebuild the table from scratch by pulling the latest block chain from the server.
+ */
+ void rebuild();
+ void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
+ uint64_t getArbitrator(IoTString *key);
+ void close();
+ IoTString *getCommitted(IoTString *key);
+ IoTString *getSpeculative(IoTString *key);
+ IoTString *getCommittedAtomic(IoTString *key);
+ bool createNewKey(IoTString *keyName, int64_t machineId);
+ TransactionStatus *commitTransaction();
+
/**
* Get the machine ID for this client
*/
- int64_t getMachineId() { return localMachineId; }
-
- /**
- * Decrement the number of live slots that we currently have
- */
- void decrementLiveCount() { liveSlotCount--; }
- int64_t getLocalSequenceNumber();
- Array<char> * acceptDataFromLocal(Array<char> * data);
+ int64_t getMachineId() { return localMachineId; }
+
+ /**
+ * Decrement the number of live slots that we currently have
+ */
+ void decrementLiveCount() { liveSlotCount--; }
+ int64_t getLocalSequenceNumber();
+ Array<char> *acceptDataFromLocal(Array<char> *data);
};
#endif
#include "TableStatus.h"
#include "ByteBuffer.h"
-Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb) {
- int maxslots=bb->getInt();
+Entry *TableStatus_decode(Slot *slot, ByteBuffer *bb) {
+ int maxslots = bb->getInt();
return new TableStatus(slot, maxslots);
}
-void TableStatus::encode(ByteBuffer * bb) {
+void TableStatus::encode(ByteBuffer *bb) {
bb->put(TypeTableStatus);
bb->putInt(maxslots);
}
*/
class TableStatus : public Entry {
- private:
+private:
int maxslots;
- public:
- TableStatus(Slot * slot, int _maxslots) : Entry(slot),
+public:
+ TableStatus(Slot *slot, int _maxslots) : Entry(slot),
maxslots(_maxslots) {
- }
+ }
int getMaxSlots() { return maxslots; }
void encode(ByteBuffer *bb);
- int getSize() { return sizeof(int32_t)+sizeof(char); }
-
+ int getSize() { return sizeof(int32_t) + sizeof(char); }
+
char getType() { return TypeTableStatus; }
-
- Entry * getCopy(Slot * s) { return new TableStatus(s, maxslots); }
+
+ Entry *getCopy(Slot *s) { return new TableStatus(s, maxslots); }
};
-Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb);
+Entry *TableStatus_decode(Slot *slot, ByteBuffer *bb);
#endif
#define THREETUPLE_H
template<typename A, typename B, typename C>
- class ThreeTuple {
- private:
+class ThreeTuple {
+private:
A a;
B b;
C c;
- public:
- ThreeTuple(A _a, B _b, C _c) :
- a(_a),
+public:
+ ThreeTuple(A _a, B _b, C _c) :
+ a(_a),
b(_b),
c(_c) {
- }
+ }
A getFirst() {
return a;
}
#include <sys/time.h>
class TimingSingleton {
- private:
+private:
static TimingSingleton singleton = new TimingSingleton( );
int64_t startTime = 0;
int64_t totalTime = 0;
-
- TimingSingleton() : startTime(0),
+
+ TimingSingleton() : startTime(0),
totalTime(0) {
}
int64_t time;
struct timeval tv;
gettimeofday(&tv, NULL);
- return tv.tv_sec*1000000000+tv.tv_usec*1000;
+ return tv.tv_sec * 1000000000 + tv.tv_usec * 1000;
}
-
- public:
+
+public:
void startTime() {
startTime = nanoTime();
}
-
+
void endTime() {
totalTime += nanoTime() - startTime;
}
};
TimingSingleton t_singleton;
-TimingSingleton * TimingSingleton_getInstance() {
+TimingSingleton *TimingSingleton_getInstance() {
return &t_singleton;
}
#endif
class Transaction {
- Hashtable<int32_t, TransactionPart> parts = NULL;
- Set<int32_t> missingParts = NULL;
- Vector<int32_t> partsPendingSend = NULL;
- bool isComplete = false;
- bool hasLastPart = false;
- Set<KeyValue> keyValueGuardSet = NULL;
- Set<KeyValue> keyValueUpdateSet = NULL;
- bool isDead = false;
- int64_t sequenceNumber = -1;
- int64_t clientLocalSequenceNumber = -1;
- int64_t arbitratorId = -1;
- int64_t machineId = -1;
- Pair<int64_t, int64_t> transactionId = NULL;
-
- int nextPartToSend = 0;
- bool didSendAPartToServer = false;
-
- TransactionStatus transactionStatus = NULL;
-
- bool hadServerFailure = false;
-
- Transaction() {
- parts = new Hashtable<int32_t, TransactionPart>();
- keyValueGuardSet = new HashSet<KeyValue>();
- keyValueUpdateSet = new HashSet<KeyValue>();
- partsPendingSend = new Vector<int32_t>();
- }
-
- void addPartEncode(TransactionPart newPart) {
- parts.put(newPart.getPartNumber(), newPart);
- partsPendingSend.add(newPart.getPartNumber());
-
- sequenceNumber = newPart.getSequenceNumber();
- arbitratorId = newPart.getArbitratorId();
- transactionId = newPart.getTransactionId();
- clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
- machineId = newPart.getMachineId();
-
- isComplete = true;
- }
-
- void addPartDecode(TransactionPart newPart) {
-
- if (isDead) {
- // If dead then just kill this part and move on
- newPart.setDead();
- return;
- }
-
- sequenceNumber = newPart.getSequenceNumber();
- arbitratorId = newPart.getArbitratorId();
- transactionId = newPart.getTransactionId();
- clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
- machineId = newPart.getMachineId();
-
- TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<int32_t>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
- }
- }
- }
-
- if (!isComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts.size() == 0) {
-
- // We have all the parts
- isComplete = true;
-
- // Decode all the parts and create the key value guard and update sets
- decodeTransactionData();
- }
- }
- }
-
- void addUpdateKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- }
-
- void addGuardKV(KeyValue kv) {
- keyValueGuardSet.add(kv);
- }
-
-
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- void setSequenceNumber(int64_t _sequenceNumber) {
- sequenceNumber = _sequenceNumber;
-
- for (int32_t i : parts.keySet()) {
- parts.get(i).setSequenceNumber(sequenceNumber);
- }
- }
-
- int64_t getClientLocalSequenceNumber() {
- return clientLocalSequenceNumber;
- }
-
- Hashtable<int32_t, TransactionPart> getParts() {
- return parts;
- }
+ Hashtable<int32_t, TransactionPart> parts = NULL;
+ Set<int32_t> missingParts = NULL;
+ Vector<int32_t> partsPendingSend = NULL;
+ bool isComplete = false;
+ bool hasLastPart = false;
+ Set<KeyValue> keyValueGuardSet = NULL;
+ Set<KeyValue> keyValueUpdateSet = NULL;
+ bool isDead = false;
+ int64_t sequenceNumber = -1;
+ int64_t clientLocalSequenceNumber = -1;
+ int64_t arbitratorId = -1;
+ int64_t machineId = -1;
+ Pair<int64_t, int64_t> transactionId = NULL;
+
+ int nextPartToSend = 0;
+ bool didSendAPartToServer = false;
+
+ TransactionStatus transactionStatus = NULL;
+
+ bool hadServerFailure = false;
+
+ Transaction() {
+ parts = new Hashtable<int32_t, TransactionPart>();
+ keyValueGuardSet = new HashSet<KeyValue>();
+ keyValueUpdateSet = new HashSet<KeyValue>();
+ partsPendingSend = new Vector<int32_t>();
+ }
+
+ void addPartEncode(TransactionPart newPart) {
+ parts.put(newPart.getPartNumber(), newPart);
+ partsPendingSend.add(newPart.getPartNumber());
+
+ sequenceNumber = newPart.getSequenceNumber();
+ arbitratorId = newPart.getArbitratorId();
+ transactionId = newPart.getTransactionId();
+ clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+ machineId = newPart.getMachineId();
+
+ isComplete = true;
+ }
+
+ void addPartDecode(TransactionPart newPart) {
+
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart.setDead();
+ return;
+ }
+
+ sequenceNumber = newPart.getSequenceNumber();
+ arbitratorId = newPart.getArbitratorId();
+ transactionId = newPart.getTransactionId();
+ clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+ machineId = newPart.getMachineId();
+
+ TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+ if (previoslySeenPart != NULL) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previoslySeenPart.setDead();
+ } else if (newPart.isLastPart()) {
+ missingParts = new HashSet<int32_t>();
+ hasLastPart = true;
+
+ for (int i = 0; i < newPart.getPartNumber(); i++) {
+ if (parts.get(i) == NULL) {
+ missingParts.add(i);
+ }
+ }
+ }
+
+ if (!isComplete && hasLastPart) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts.remove(newPart.getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts.size() == 0) {
+
+ // We have all the parts
+ isComplete = true;
+
+ // Decode all the parts and create the key value guard and update sets
+ decodeTransactionData();
+ }
+ }
+ }
+
+ void addUpdateKV(KeyValue kv) {
+ keyValueUpdateSet.add(kv);
+ }
+
+ void addGuardKV(KeyValue kv) {
+ keyValueGuardSet.add(kv);
+ }
+
+
+ int64_t getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ void setSequenceNumber(int64_t _sequenceNumber) {
+ sequenceNumber = _sequenceNumber;
+
+ for (int32_t i : parts.keySet()) {
+ parts.get(i).setSequenceNumber(sequenceNumber);
+ }
+ }
+
+ int64_t getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
+ }
+
+ Hashtable<int32_t, TransactionPart> getParts() {
+ return parts;
+ }
- bool didSendAPartToServer() {
- return didSendAPartToServer;
- }
-
- void resetNextPartToSend() {
- nextPartToSend = 0;
- }
-
- TransactionPart getNextPartToSend() {
- if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
- return NULL;
- }
- TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
- nextPartToSend++;
- return part;
- }
-
-
- void setServerFailure() {
- hadServerFailure = true;
- }
-
- bool getServerFailure() {
- return hadServerFailure;
- }
-
-
- void resetServerFailure() {
- hadServerFailure = false;
- }
-
-
- void setTransactionStatus(TransactionStatus _transactionStatus) {
- transactionStatus = _transactionStatus;
- }
-
- TransactionStatus getTransactionStatus() {
- return transactionStatus;
- }
-
- void removeSentParts(Vector<int32_t> sentParts) {
- nextPartToSend = 0;
- if(partsPendingSend.removeAll(sentParts))
- {
- didSendAPartToServer = true;
- transactionStatus.setTransactionSequenceNumber(sequenceNumber);
- }
- }
-
- bool didSendAllParts() {
- return partsPendingSend.isEmpty();
- }
-
- Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
-
- int getNumberOfParts() {
- return parts.size();
- }
-
- int64_t getMachineId() {
- return machineId;
- }
-
- int64_t getArbitrator() {
- return arbitratorId;
- }
-
- bool isComplete() {
- return isComplete;
- }
-
- Pair<int64_t, int64_t> getId() {
- return transactionId;
- }
-
- void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts.keySet()) {
- TransactionPart part = parts.get(partNumber);
- part.setDead();
- }
- }
-
- TransactionPart getPart(int index) {
- return parts.get(index);
- }
-
- void decodeTransactionData() {
-
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- TransactionPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
-
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- TransactionPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
-
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVGuards = bbDecode.getInt();
- int numberOfKVUpdates = bbDecode.getInt();
-
- // Decode all the guard key values
- for (int i = 0; i < numberOfKVGuards; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueGuardSet.add(kv);
- }
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- }
- }
-
- bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
- for (KeyValue kvGuard : keyValueGuardSet) {
-
- // First check if the key is in the speculative table, this is the value of the latest assumption
- KeyValue kv = NULL;
-
- // If we have a speculation table then use it first
- if (pendingTransactionSpeculatedKeyValueTable != NULL) {
- kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
- }
-
- // If we have a speculation table then use it first
- if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
- kv = speculatedKeyValueTable.get(kvGuard.getKey());
- }
-
- if (kv == NULL) {
- // if it is not in the speculative table then check the committed table and use that
- // value as our latest assumption
- kv = committedKeyValueTable.get(kvGuard.getKey());
- }
-
- if (kvGuard.getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
-
-
- if (kv != NULL) {
- System.out.println(kvGuard.getValue() + " " + kv.getValue());
- } else {
- System.out.println(kvGuard.getValue() + " " + kv);
- }
-
- return false;
- }
- } else {
- if (kv != NULL) {
- return false;
- }
- }
- }
- return true;
- }
+ bool didSendAPartToServer() {
+ return didSendAPartToServer;
+ }
+
+ void resetNextPartToSend() {
+ nextPartToSend = 0;
+ }
+
+ TransactionPart getNextPartToSend() {
+ if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+ return NULL;
+ }
+ TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+ nextPartToSend++;
+ return part;
+ }
+
+
+ void setServerFailure() {
+ hadServerFailure = true;
+ }
+
+ bool getServerFailure() {
+ return hadServerFailure;
+ }
+
+
+ void resetServerFailure() {
+ hadServerFailure = false;
+ }
+
+
+ void setTransactionStatus(TransactionStatus _transactionStatus) {
+ transactionStatus = _transactionStatus;
+ }
+
+ TransactionStatus getTransactionStatus() {
+ return transactionStatus;
+ }
+
+ void removeSentParts(Vector<int32_t> sentParts) {
+ nextPartToSend = 0;
+ if (partsPendingSend.removeAll(sentParts))
+ {
+ didSendAPartToServer = true;
+ transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+ }
+ }
+
+ bool didSendAllParts() {
+ return partsPendingSend.isEmpty();
+ }
+
+ Set<KeyValue> getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
+
+ int getNumberOfParts() {
+ return parts.size();
+ }
+
+ int64_t getMachineId() {
+ return machineId;
+ }
+
+ int64_t getArbitrator() {
+ return arbitratorId;
+ }
+
+ bool isComplete() {
+ return isComplete;
+ }
+
+ Pair<int64_t, int64_t> getId() {
+ return transactionId;
+ }
+
+ void setDead() {
+ if (isDead) {
+ // Already dead
+ return;
+ }
+
+ // Set dead
+ isDead = true;
+
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber : parts.keySet()) {
+ TransactionPart part = parts.get(partNumber);
+ part.setDead();
+ }
+ }
+
+ TransactionPart getPart(int index) {
+ return parts.get(index);
+ }
+
+ void decodeTransactionData() {
+
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ TransactionPart tp = parts.get(i);
+ dataSize += tp.getDataSize();
+ }
+
+ char[] combinedData = new char[dataSize];
+ int currentPosition = 0;
+
+ // Stitch all the data sections together
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ TransactionPart tp = parts.get(i);
+ System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+ currentPosition += tp.getDataSize();
+ }
+
+ // Decoder Object
+ ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVGuards = bbDecode.getInt();
+ int numberOfKVUpdates = bbDecode.getInt();
+
+ // Decode all the guard key values
+ for (int i = 0; i < numberOfKVGuards; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueGuardSet.add(kv);
+ }
+
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueUpdateSet.add(kv);
+ }
+ }
+
+ bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
+ for (KeyValue kvGuard : keyValueGuardSet) {
+
+ // First check if the key is in the speculative table, this is the value of the latest assumption
+ KeyValue kv = NULL;
+
+ // If we have a speculation table then use it first
+ if (pendingTransactionSpeculatedKeyValueTable != NULL) {
+ kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ // If we have a speculation table then use it first
+ if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
+ kv = speculatedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ if (kv == NULL) {
+ // if it is not in the speculative table then check the committed table and use that
+ // value as our latest assumption
+ kv = committedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ if (kvGuard.getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+
+
+ if (kv != NULL) {
+ System.out.println(kvGuard.getValue() + " " + kv.getValue());
+ } else {
+ System.out.println(kvGuard.getValue() + " " + kv);
+ }
+
+ return false;
+ }
+ } else {
+ if (kv != NULL) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
#include "common.h"
class Transaction {
- private:
+private:
Hashtable<int32_t, TransactionPart *> parts = NULL;
Set<int32_t> missingParts = NULL;
Vector<int32_t> partsPendingSend = NULL;
- bool isComplete = false;
- bool hasLastPart = false;
- Hashset<KeyValue *> * keyValueGuardSet = NULL;
- Hashset<KeyValue *> * keyValueUpdateSet = NULL;
+ bool isComplete = false;
+ bool hasLastPart = false;
+ Hashset<KeyValue *> *keyValueGuardSet = NULL;
+ Hashset<KeyValue *> *keyValueUpdateSet = NULL;
bool isDead = false;
- int64_t sequenceNumber = -1;
+ int64_t sequenceNumber = -1;
int64_t clientLocalSequenceNumber = -1;
int64_t arbitratorId = -1;
int64_t machineId = -1;
Pair<uint64_t, uint64_t> transactionId = NULL;
-
- int nextPartToSend = 0;
+
+ int nextPartToSend = 0;
bool didSendAPartToServer = false;
-
- TransactionStatus transactionStatus = NULL;
- bool hadServerFailure = false;
+ TransactionStatus transactionStatus = NULL;
+
+ bool hadServerFailure = false;
+
+ public Transaction() {
+ parts = new Hashtable<int32_t, TransactionPart>();
+ keyValueGuardSet = new HashSet<KeyValue>();
+ keyValueUpdateSet = new HashSet<KeyValue>();
+ partsPendingSend = new Vector<int32_t>();
+ }
+
+ public void addPartEncode(TransactionPart newPart) {
+ parts.put(newPart.getPartNumber(), newPart);
+ partsPendingSend.add(newPart.getPartNumber());
+
+ sequenceNumber = newPart.getSequenceNumber();
+ arbitratorId = newPart.getArbitratorId();
+ transactionId = newPart.getTransactionId();
+ clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+ machineId = newPart.getMachineId();
+
+ isComplete = true;
+ }
+
+ public void addPartDecode(TransactionPart newPart) {
+
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart.setDead();
+ return;
+ }
+
+ sequenceNumber = newPart.getSequenceNumber();
+ arbitratorId = newPart.getArbitratorId();
+ transactionId = newPart.getTransactionId();
+ clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+ machineId = newPart.getMachineId();
+
+ TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+ if (previoslySeenPart != NULL) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previoslySeenPart.setDead();
+ } else if (newPart.isLastPart()) {
+ missingParts = new HashSet<int32_t>();
+ hasLastPart = true;
+
+ for (int i = 0; i < newPart.getPartNumber(); i++) {
+ if (parts.get(i) == NULL) {
+ missingParts.add(i);
+ }
+ }
+ }
+
+ if (!isComplete && hasLastPart) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts.remove(newPart.getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts.size() == 0) {
+
+ // We have all the parts
+ isComplete = true;
- public Transaction() {
- parts = new Hashtable<int32_t, TransactionPart>();
- keyValueGuardSet = new HashSet<KeyValue>();
- keyValueUpdateSet = new HashSet<KeyValue>();
- partsPendingSend = new Vector<int32_t>();
- }
-
- public void addPartEncode(TransactionPart newPart) {
- parts.put(newPart.getPartNumber(), newPart);
- partsPendingSend.add(newPart.getPartNumber());
-
- sequenceNumber = newPart.getSequenceNumber();
- arbitratorId = newPart.getArbitratorId();
- transactionId = newPart.getTransactionId();
- clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
- machineId = newPart.getMachineId();
-
- isComplete = true;
- }
-
- public void addPartDecode(TransactionPart newPart) {
+ // Decode all the parts and create the key value guard and update sets
+ decodeTransactionData();
+ }
+ }
+ }
- if (isDead) {
- // If dead then just kill this part and move on
- newPart.setDead();
- return;
- }
+ public void addUpdateKV(KeyValue kv) {
+ keyValueUpdateSet.add(kv);
+ }
- sequenceNumber = newPart.getSequenceNumber();
- arbitratorId = newPart.getArbitratorId();
- transactionId = newPart.getTransactionId();
- clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
- machineId = newPart.getMachineId();
+ public void addGuardKV(KeyValue kv) {
+ keyValueGuardSet.add(kv);
+ }
- TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
- if (previoslySeenPart != NULL) {
- // Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<int32_t>();
- hasLastPart = true;
-
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
- }
- }
- }
+ public int64_t getSequenceNumber() {
+ return sequenceNumber;
+ }
- if (!isComplete && hasLastPart) {
-
- // We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
-
- // Check if all the parts have been seen
- if (missingParts.size() == 0) {
+ public void setSequenceNumber(int64_t _sequenceNumber) {
+ sequenceNumber = _sequenceNumber;
- // We have all the parts
- isComplete = true;
+ for (int32_t i : parts.keySet()) {
+ parts.get(i).setSequenceNumber(sequenceNumber);
+ }
+ }
+
+ public int64_t getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
+ }
+
+ public Hashtable<int32_t, TransactionPart> getParts() {
+ return parts;
+ }
- // Decode all the parts and create the key value guard and update sets
- decodeTransactionData();
- }
- }
- }
-
- public void addUpdateKV(KeyValue kv) {
- keyValueUpdateSet.add(kv);
- }
-
- public void addGuardKV(KeyValue kv) {
- keyValueGuardSet.add(kv);
- }
-
-
- public int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- public void setSequenceNumber(int64_t _sequenceNumber) {
- sequenceNumber = _sequenceNumber;
-
- for (int32_t i : parts.keySet()) {
- parts.get(i).setSequenceNumber(sequenceNumber);
- }
- }
-
- public int64_t getClientLocalSequenceNumber() {
- return clientLocalSequenceNumber;
- }
-
- public Hashtable<int32_t, TransactionPart> getParts() {
- return parts;
- }
-
- public bool didSendAPartToServer() {
- return didSendAPartToServer;
- }
-
- public void resetNextPartToSend() {
- nextPartToSend = 0;
- }
-
- public TransactionPart getNextPartToSend() {
- if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
- return NULL;
- }
- TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
- nextPartToSend++;
- return part;
- }
-
-
- public void setServerFailure() {
- hadServerFailure = true;
- }
-
- public bool getServerFailure() {
- return hadServerFailure;
- }
-
-
- public void resetServerFailure() {
- hadServerFailure = false;
- }
-
-
- public void setTransactionStatus(TransactionStatus _transactionStatus) {
- transactionStatus = _transactionStatus;
- }
-
- public TransactionStatus getTransactionStatus() {
- return transactionStatus;
- }
-
- public void removeSentParts(Vector<int32_t> sentParts) {
- nextPartToSend = 0;
- if(partsPendingSend.removeAll(sentParts))
- {
- didSendAPartToServer = true;
- transactionStatus.setTransactionSequenceNumber(sequenceNumber);
- }
- }
-
- public bool didSendAllParts() {
- return partsPendingSend.isEmpty();
- }
-
- public Set<KeyValue> getKeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
-
- public int getNumberOfParts() {
- return parts.size();
- }
-
- public int64_t getMachineId() {
- return machineId;
- }
-
- public int64_t getArbitrator() {
- return arbitratorId;
- }
-
- public bool isComplete() {
- return isComplete;
- }
-
- public Pair<int64_t, int64_t> getId() {
- return transactionId;
- }
-
- public void setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (int32_t partNumber : parts.keySet()) {
- TransactionPart part = parts.get(partNumber);
- part.setDead();
- }
- }
-
- public TransactionPart getPart(int index) {
- return parts.get(index);
- }
-
- private void decodeTransactionData() {
-
- // Calculate the size of the data section
- int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- TransactionPart tp = parts.get(i);
- dataSize += tp.getDataSize();
- }
-
- char[] combinedData = new char[dataSize];
- int currentPosition = 0;
-
- // Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- TransactionPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
- }
-
- // Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
-
- // Decode how many key value pairs need to be decoded
- int numberOfKVGuards = bbDecode.getInt();
- int numberOfKVUpdates = bbDecode.getInt();
-
- // Decode all the guard key values
- for (int i = 0; i < numberOfKVGuards; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueGuardSet.add(kv);
- }
-
- // Decode all the updates key values
- for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
- }
- }
-
- public bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
- for (KeyValue kvGuard : keyValueGuardSet) {
-
- // First check if the key is in the speculative table, this is the value of the latest assumption
- KeyValue kv = NULL;
-
- // If we have a speculation table then use it first
- if (pendingTransactionSpeculatedKeyValueTable != NULL) {
- kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
- }
-
- // If we have a speculation table then use it first
- if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
- kv = speculatedKeyValueTable.get(kvGuard.getKey());
- }
-
- if (kv == NULL) {
- // if it is not in the speculative table then check the committed table and use that
- // value as our latest assumption
- kv = committedKeyValueTable.get(kvGuard.getKey());
- }
-
- if (kvGuard.getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
-
-
- if (kv != NULL) {
- System.out.println(kvGuard.getValue() + " " + kv.getValue());
- } else {
- System.out.println(kvGuard.getValue() + " " + kv);
- }
-
- return false;
- }
- } else {
- if (kv != NULL) {
- return false;
- }
- }
- }
- return true;
- }
+ public bool didSendAPartToServer() {
+ return didSendAPartToServer;
+ }
+
+ public void resetNextPartToSend() {
+ nextPartToSend = 0;
+ }
+
+ public TransactionPart getNextPartToSend() {
+ if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+ return NULL;
+ }
+ TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+ nextPartToSend++;
+ return part;
+ }
+
+
+ public void setServerFailure() {
+ hadServerFailure = true;
+ }
+
+ public bool getServerFailure() {
+ return hadServerFailure;
+ }
+
+
+ public void resetServerFailure() {
+ hadServerFailure = false;
+ }
+
+
+ public void setTransactionStatus(TransactionStatus _transactionStatus) {
+ transactionStatus = _transactionStatus;
+ }
+
+ public TransactionStatus getTransactionStatus() {
+ return transactionStatus;
+ }
+
+ public void removeSentParts(Vector<int32_t> sentParts) {
+ nextPartToSend = 0;
+ if (partsPendingSend.removeAll(sentParts))
+ {
+ didSendAPartToServer = true;
+ transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+ }
+ }
+
+ public bool didSendAllParts() {
+ return partsPendingSend.isEmpty();
+ }
+
+ public Set<KeyValue> getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
+
+ public int getNumberOfParts() {
+ return parts.size();
+ }
+
+ public int64_t getMachineId() {
+ return machineId;
+ }
+
+ public int64_t getArbitrator() {
+ return arbitratorId;
+ }
+
+ public bool isComplete() {
+ return isComplete;
+ }
+
+ public Pair<int64_t, int64_t> getId() {
+ return transactionId;
+ }
+
+ public void setDead() {
+ if (isDead) {
+ // Already dead
+ return;
+ }
+
+ // Set dead
+ isDead = true;
+
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber : parts.keySet()) {
+ TransactionPart part = parts.get(partNumber);
+ part.setDead();
+ }
+ }
+
+ public TransactionPart getPart(int index) {
+ return parts.get(index);
+ }
+
+ private void decodeTransactionData() {
+
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ TransactionPart tp = parts.get(i);
+ dataSize += tp.getDataSize();
+ }
+
+ char[] combinedData = new char[dataSize];
+ int currentPosition = 0;
+
+ // Stitch all the data sections together
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ TransactionPart tp = parts.get(i);
+ System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+ currentPosition += tp.getDataSize();
+ }
+
+ // Decoder Object
+ ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVGuards = bbDecode.getInt();
+ int numberOfKVUpdates = bbDecode.getInt();
+
+ // Decode all the guard key values
+ for (int i = 0; i < numberOfKVGuards; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueGuardSet.add(kv);
+ }
+
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueUpdateSet.add(kv);
+ }
+ }
+
+ public bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
+ for (KeyValue kvGuard : keyValueGuardSet) {
+
+ // First check if the key is in the speculative table, this is the value of the latest assumption
+ KeyValue kv = NULL;
+
+ // If we have a speculation table then use it first
+ if (pendingTransactionSpeculatedKeyValueTable != NULL) {
+ kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ // If we have a speculation table then use it first
+ if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
+ kv = speculatedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ if (kv == NULL) {
+ // if it is not in the speculative table then check the committed table and use that
+ // value as our latest assumption
+ kv = committedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ if (kvGuard.getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+
+
+ if (kv != NULL) {
+ System.out.println(kvGuard.getValue() + " " + kv.getValue());
+ } else {
+ System.out.println(kvGuard.getValue() + " " + kv);
+ }
+
+ return false;
+ }
+ } else {
+ if (kv != NULL) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
};
#endif
class TransactionPart extends Entry {
- // Max size of the part excluding the fixed size header
- static final int MAX_NON_HEADER_SIZE = 512;
-
- int64_t sequenceNumber = -1;
- int64_t machineId = -1;
- int64_t arbitratorId = -1;
- int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of
- int partNumber = -1; // Parts position in the
- Boolean isLastPart = false;
-
- Pair<int64_t, int64_t> transactionId = NULL;
- Pair<int64_t int32_t> partId = NULL;
-
- char[] data = NULL;
-
- TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) {
- super(s);
- machineId = _machineId;
- arbitratorId = _arbitratorId;
- clientLocalSequenceNumber = _clientLocalSequenceNumber;
- partNumber = _partNumber;
- data = _data;
- isLastPart = _isLastPart;
-
- transactionId = new Pair<int64_t, int64_t>(machineId, clientLocalSequenceNumber);
- partId = new Pair<int64_t int32_t>(clientLocalSequenceNumber, partNumber);
-
- }
-
- int getSize() {
- if (data == NULL) {
- return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
- }
- return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
- }
-
- void setSlot(Slot s) {
- parentslot = s;
- }
-
- Pair<int64_t, int64_t> getTransactionId() {
- return transactionId;
- }
-
- int64_t getArbitratorId() {
- return arbitratorId;
- }
-
- Pair<int64_t int32_t> getPartId() {
- return partId;
- }
-
- int getPartNumber() {
- return partNumber;
- }
-
- int getDataSize() {
- return data.length;
- }
-
- char[] getData() {
- return data;
- }
-
- Boolean isLastPart() {
- return isLastPart;
- }
-
- int64_t getMachineId() {
- return machineId;
- }
-
- int64_t getClientLocalSequenceNumber() {
- return clientLocalSequenceNumber;
- }
-
-
- int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- void setSequenceNumber(int64_t _sequenceNumber) {
- sequenceNumber = _sequenceNumber;
- }
-
- static Entry decode(Slot s, ByteBuffer bb) {
- int64_t sequenceNumber = bb->getLong();
- int64_t machineId = bb->getLong();
- int64_t arbitratorId = bb->getLong();
- int64_t clientLocalSequenceNumber = bb->getLong();
- int partNumber = bb->getInt();
- int dataSize = bb->getInt();
- Boolean isLastPart = (bb->get() == 1);
- // Get the data
- char[] data = new char[dataSize];
- bb->get(data);
-
- TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
- returnTransactionPart.setSequenceNumber(sequenceNumber);
-
- return returnTransactionPart;
- }
-
- void encode(ByteBuffer bb) {
- bb->put(Entry.TypeTransactionPart);
- bb->putLong(sequenceNumber);
- bb->putLong(machineId);
- bb->putLong(arbitratorId);
- bb->putLong(clientLocalSequenceNumber);
- bb->putInt(partNumber);
- bb->putInt(data.length);
-
- if (isLastPart) {
- bb->put((char)1);
- } else {
- bb->put((char)0);
- }
-
- bb->put(data);
- }
-
- char getType() {
- return Entry.TypeTransactionPart;
- }
-
- Entry getCopy(Slot s) {
-
- TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
- copyTransaction.setSequenceNumber(sequenceNumber);
-
- return copyTransaction;
- }
+ // Max size of the part excluding the fixed size header
+ static final int MAX_NON_HEADER_SIZE = 512;
+
+ int64_t sequenceNumber = -1;
+ int64_t machineId = -1;
+ int64_t arbitratorId = -1;
+ int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of
+ int partNumber = -1; // Parts position in the
+ bool isLastPart = false;
+
+ Pair<int64_t, int64_t> transactionId = NULL;
+ Pair<int64_t int32_t> partId = NULL;
+
+ char[] data = NULL;
+
+ TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) {
+ super(s);
+ machineId = _machineId;
+ arbitratorId = _arbitratorId;
+ clientLocalSequenceNumber = _clientLocalSequenceNumber;
+ partNumber = _partNumber;
+ data = _data;
+ isLastPart = _isLastPart;
+
+ transactionId = new Pair<int64_t, int64_t>(machineId, clientLocalSequenceNumber);
+ partId = new Pair<int64_t int32_t>(clientLocalSequenceNumber, partNumber);
+
+ }
+
+ int getSize() {
+ if (data == NULL) {
+ return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
+ }
+ return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
+ }
+
+ void setSlot(Slot s) {
+ parentslot = s;
+ }
+
+ Pair<int64_t, int64_t> getTransactionId() {
+ return transactionId;
+ }
+
+ int64_t getArbitratorId() {
+ return arbitratorId;
+ }
+
+ Pair<int64_t int32_t> getPartId() {
+ return partId;
+ }
+
+ int getPartNumber() {
+ return partNumber;
+ }
+
+ int getDataSize() {
+ return data.length;
+ }
+
+ char[] getData() {
+ return data;
+ }
+
+ bool isLastPart() {
+ return isLastPart;
+ }
+
+ int64_t getMachineId() {
+ return machineId;
+ }
+
+ int64_t getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
+ }
+
+
+ int64_t getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ void setSequenceNumber(int64_t _sequenceNumber) {
+ sequenceNumber = _sequenceNumber;
+ }
+
+ static Entry decode(Slot s, ByteBuffer bb) {
+ int64_t sequenceNumber = bb->getLong();
+ int64_t machineId = bb->getLong();
+ int64_t arbitratorId = bb->getLong();
+ int64_t clientLocalSequenceNumber = bb->getLong();
+ int partNumber = bb->getInt();
+ int dataSize = bb->getInt();
+ bool isLastPart = (bb->get() == 1);
+ // Get the data
+ char[] data = new char[dataSize];
+ bb->get(data);
+
+ TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
+ returnTransactionPart.setSequenceNumber(sequenceNumber);
+
+ return returnTransactionPart;
+ }
+
+ void encode(ByteBuffer bb) {
+ bb->put(Entry.TypeTransactionPart);
+ bb->putLong(sequenceNumber);
+ bb->putLong(machineId);
+ bb->putLong(arbitratorId);
+ bb->putLong(clientLocalSequenceNumber);
+ bb->putInt(partNumber);
+ bb->putInt(data.length);
+
+ if (isLastPart) {
+ bb->put((char)1);
+ } else {
+ bb->put((char)0);
+ }
+
+ bb->put(data);
+ }
+
+ char getType() {
+ return Entry.TypeTransactionPart;
+ }
+
+ Entry getCopy(Slot s) {
+
+ TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
+ copyTransaction.setSequenceNumber(sequenceNumber);
+
+ return copyTransaction;
+ }
}
class TransactionPart extends Entry {
- // Max size of the part excluding the fixed size header
- public static final int MAX_NON_HEADER_SIZE = 512;
-
- private int64_t sequenceNumber = -1;
- private int64_t machineId = -1;
- private int64_t arbitratorId = -1;
- private int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of
- private int partNumber = -1; // Parts position in the
- private Boolean isLastPart = false;
-
- private Pair<int64_t, int64_t> transactionId = NULL;
- private Pair<int64_t int32_t> partId = NULL;
-
- private char[] data = NULL;
-
- public TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, Boolean _isLastPart) {
- super(s);
- machineId = _machineId;
- arbitratorId = _arbitratorId;
- clientLocalSequenceNumber = _clientLocalSequenceNumber;
- partNumber = _partNumber;
- data = _data;
- isLastPart = _isLastPart;
-
- transactionId = new Pair<int64_t, int64_t>(machineId, clientLocalSequenceNumber);
- partId = new Pair<int64_t int32_t>(clientLocalSequenceNumber, partNumber);
-
- }
-
- public int getSize() {
- if (data == NULL) {
- return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
- }
- return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
- }
-
- public void setSlot(Slot s) {
- parentslot = s;
- }
-
- public Pair<int64_t, int64_t> getTransactionId() {
- return transactionId;
- }
-
- public int64_t getArbitratorId() {
- return arbitratorId;
- }
-
- public Pair<int64_t int32_t> getPartId() {
- return partId;
- }
-
- public int getPartNumber() {
- return partNumber;
- }
-
- public int getDataSize() {
- return data.length;
- }
-
- public char[] getData() {
- return data;
- }
-
- public Boolean isLastPart() {
- return isLastPart;
- }
-
- public int64_t getMachineId() {
- return machineId;
- }
-
- public int64_t getClientLocalSequenceNumber() {
- return clientLocalSequenceNumber;
- }
-
-
- public int64_t getSequenceNumber() {
- return sequenceNumber;
- }
-
- public void setSequenceNumber(int64_t _sequenceNumber) {
- sequenceNumber = _sequenceNumber;
- }
-
- static Entry decode(Slot s, ByteBuffer bb) {
- int64_t sequenceNumber = bb->getLong();
- int64_t machineId = bb->getLong();
- int64_t arbitratorId = bb->getLong();
- int64_t clientLocalSequenceNumber = bb->getLong();
- int partNumber = bb->getInt();
- int dataSize = bb->getInt();
- Boolean isLastPart = (bb->get() == 1);
- // Get the data
- char[] data = new char[dataSize];
- bb->get(data);
-
- TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
- returnTransactionPart.setSequenceNumber(sequenceNumber);
-
- return returnTransactionPart;
- }
-
- public void encode(ByteBuffer bb) {
- bb->put(Entry.TypeTransactionPart);
- bb->putLong(sequenceNumber);
- bb->putLong(machineId);
- bb->putLong(arbitratorId);
- bb->putLong(clientLocalSequenceNumber);
- bb->putInt(partNumber);
- bb->putInt(data.length);
-
- if (isLastPart) {
- bb->put((char)1);
- } else {
- bb->put((char)0);
- }
-
- bb->put(data);
- }
-
- public char getType() {
- return Entry.TypeTransactionPart;
- }
-
- public Entry getCopy(Slot s) {
-
- TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
- copyTransaction.setSequenceNumber(sequenceNumber);
-
- return copyTransaction;
- }
+ // Max size of the part excluding the fixed size header
+ public static final int MAX_NON_HEADER_SIZE = 512;
+
+ private int64_t sequenceNumber = -1;
+ private int64_t machineId = -1;
+ private int64_t arbitratorId = -1;
+ private int64_t clientLocalSequenceNumber = -1; // Sequence number of the transaction that this is a part of
+ private int partNumber = -1; // Parts position in the
+ private bool isLastPart = false;
+
+ private Pair<int64_t, int64_t> transactionId = NULL;
+ private Pair<int64_t int32_t> partId = NULL;
+
+ private char[] data = NULL;
+
+ public TransactionPart(Slot s, int64_t _machineId, int64_t _arbitratorId, int64_t _clientLocalSequenceNumber, int _partNumber, char[] _data, bool _isLastPart) {
+ super(s);
+ machineId = _machineId;
+ arbitratorId = _arbitratorId;
+ clientLocalSequenceNumber = _clientLocalSequenceNumber;
+ partNumber = _partNumber;
+ data = _data;
+ isLastPart = _isLastPart;
+
+ transactionId = new Pair<int64_t, int64_t>(machineId, clientLocalSequenceNumber);
+ partId = new Pair<int64_t int32_t>(clientLocalSequenceNumber, partNumber);
+
+ }
+
+ public int getSize() {
+ if (data == NULL) {
+ return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
+ }
+ return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
+ }
+
+ public void setSlot(Slot s) {
+ parentslot = s;
+ }
+
+ public Pair<int64_t, int64_t> getTransactionId() {
+ return transactionId;
+ }
+
+ public int64_t getArbitratorId() {
+ return arbitratorId;
+ }
+
+ public Pair<int64_t int32_t> getPartId() {
+ return partId;
+ }
+
+ public int getPartNumber() {
+ return partNumber;
+ }
+
+ public int getDataSize() {
+ return data.length;
+ }
+
+ public char[] getData() {
+ return data;
+ }
+
+ public bool isLastPart() {
+ return isLastPart;
+ }
+
+ public int64_t getMachineId() {
+ return machineId;
+ }
+
+ public int64_t getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
+ }
+
+
+ public int64_t getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public void setSequenceNumber(int64_t _sequenceNumber) {
+ sequenceNumber = _sequenceNumber;
+ }
+
+ static Entry decode(Slot s, ByteBuffer bb) {
+ int64_t sequenceNumber = bb->getLong();
+ int64_t machineId = bb->getLong();
+ int64_t arbitratorId = bb->getLong();
+ int64_t clientLocalSequenceNumber = bb->getLong();
+ int partNumber = bb->getInt();
+ int dataSize = bb->getInt();
+ bool isLastPart = (bb->get() == 1);
+ // Get the data
+ char[] data = new char[dataSize];
+ bb->get(data);
+
+ TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
+ returnTransactionPart.setSequenceNumber(sequenceNumber);
+
+ return returnTransactionPart;
+ }
+
+ public void encode(ByteBuffer bb) {
+ bb->put(Entry.TypeTransactionPart);
+ bb->putLong(sequenceNumber);
+ bb->putLong(machineId);
+ bb->putLong(arbitratorId);
+ bb->putLong(clientLocalSequenceNumber);
+ bb->putInt(partNumber);
+ bb->putInt(data.length);
+
+ if (isLastPart) {
+ bb->put((char)1);
+ } else {
+ bb->put((char)0);
+ }
+
+ bb->put(data);
+ }
+
+ public char getType() {
+ return Entry.TypeTransactionPart;
+ }
+
+ public Entry getCopy(Slot s) {
+
+ TransactionPart copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
+ copyTransaction.setSequenceNumber(sequenceNumber);
+
+ return copyTransaction;
+ }
}
class TransactionStatus {
- static final char StatusAborted = 1;
- static final char StatusPending = 2;
- static final char StatusCommitted = 3;
- // static final char StatusRetrying = 4;
- static final char StatusSentPartial = 5;
- static final char StatusSentFully = 6;
- static final char StatusNoEffect = 10;
-
- char status = 0;
- bool applicationReleased = false;
- bool wasSentInChain = false;
- int64_t transactionSequenceNumber = 0;
- int64_t arbitrator = -1;
-
-
- TransactionStatus(char _status, int64_t _arbitrator) {
- status = _status;
- arbitrator = _arbitrator;
- }
-
- char getStatus() {
- return status;
- }
-
- void setStatus(char _status) {
- status = _status;
- }
-
- int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) {
- transactionSequenceNumber = _transactionSequenceNumber;
- }
-
- int64_t getTransactionArbitrator() {
- return arbitrator;
- }
-
- void release() {
- applicationReleased = true;
- }
-
- bool getReleased() {
- return applicationReleased;
- }
+ static final char StatusAborted = 1;
+ static final char StatusPending = 2;
+ static final char StatusCommitted = 3;
+ // static final char StatusRetrying = 4;
+ static final char StatusSentPartial = 5;
+ static final char StatusSentFully = 6;
+ static final char StatusNoEffect = 10;
+
+ char status = 0;
+ bool applicationReleased = false;
+ bool wasSentInChain = false;
+ int64_t transactionSequenceNumber = 0;
+ int64_t arbitrator = -1;
+
+
+ TransactionStatus(char _status, int64_t _arbitrator) {
+ status = _status;
+ arbitrator = _arbitrator;
+ }
+
+ char getStatus() {
+ return status;
+ }
+
+ void setStatus(char _status) {
+ status = _status;
+ }
+
+ int64_t getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+ }
+
+ void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) {
+ transactionSequenceNumber = _transactionSequenceNumber;
+ }
+
+ int64_t getTransactionArbitrator() {
+ return arbitrator;
+ }
+
+ void release() {
+ applicationReleased = true;
+ }
+
+ bool getReleased() {
+ return applicationReleased;
+ }
}
class TransactionStatus {
- static final char StatusAborted = 1;
- static final char StatusPending = 2;
- static final char StatusCommitted = 3;
- // static final char StatusRetrying = 4;
- static final char StatusSentPartial = 5;
- static final char StatusSentFully = 6;
- static final char StatusNoEffect = 10;
-
- private char status = 0;
- private bool applicationReleased = false;
- private bool wasSentInChain = false;
- private int64_t transactionSequenceNumber = 0;
- private int64_t arbitrator = -1;
-
-
- public TransactionStatus(char _status, int64_t _arbitrator) {
- status = _status;
- arbitrator = _arbitrator;
- }
-
- public char getStatus() {
- return status;
- }
-
- public void setStatus(char _status) {
- status = _status;
- }
-
- public int64_t getTransactionSequenceNumber() {
- return transactionSequenceNumber;
- }
-
- public void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) {
- transactionSequenceNumber = _transactionSequenceNumber;
- }
-
- public int64_t getTransactionArbitrator() {
- return arbitrator;
- }
-
- public void release() {
- applicationReleased = true;
- }
-
- public bool getReleased() {
- return applicationReleased;
- }
+ static final char StatusAborted = 1;
+ static final char StatusPending = 2;
+ static final char StatusCommitted = 3;
+ // static final char StatusRetrying = 4;
+ static final char StatusSentPartial = 5;
+ static final char StatusSentFully = 6;
+ static final char StatusNoEffect = 10;
+
+ private char status = 0;
+ private bool applicationReleased = false;
+ private bool wasSentInChain = false;
+ private int64_t transactionSequenceNumber = 0;
+ private int64_t arbitrator = -1;
+
+
+ public TransactionStatus(char _status, int64_t _arbitrator) {
+ status = _status;
+ arbitrator = _arbitrator;
+ }
+
+ public char getStatus() {
+ return status;
+ }
+
+ public void setStatus(char _status) {
+ status = _status;
+ }
+
+ public int64_t getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+ }
+
+ public void setTransactionSequenceNumber(int64_t _transactionSequenceNumber) {
+ transactionSequenceNumber = _transactionSequenceNumber;
+ }
+
+ public int64_t getTransactionArbitrator() {
+ return arbitrator;
+ }
+
+ public void release() {
+ applicationReleased = true;
+ }
+
+ public bool getReleased() {
+ return applicationReleased;
+ }
}
template<typename type>
class Array {
- public:
- Array() :
- array(NULL),
- size(0) {
- }
-
- Array(uint32_t _size) :
- array((type *) ourcalloc(1, sizeof(type) * _size)),
+public:
+ Array() :
+ array(NULL),
+ size(0) {
+ }
+
+ Array(uint32_t _size) :
+ array((type *) ourcalloc(1, sizeof(type) * _size)),
size(_size)
- {
- }
-
- Array(type *_array, uint _size) :
- array((type *) ourmalloc(sizeof(type) * _size)),
+ {
+ }
+
+ Array(type *_array, uint _size) :
+ array((type *) ourmalloc(sizeof(type) * _size)),
size(_size) {
- memcpy(array, _array, _size * sizeof(type));
- }
+ memcpy(array, _array, _size * sizeof(type));
+ }
- Array(Array<type> *_array) :
- array((type *) ourmalloc(sizeof(type) * _array->size)),
+ Array(Array<type> *_array) :
+ array((type *) ourmalloc(sizeof(type) * _array->size)),
size(_array->size) {
memcpy(array, _array->array, size * sizeof(type));
}
-
+
void init(uint _size) {
array = (type *) ourcalloc(1, sizeof(type) * _size);
size = _size;
size = _size;
memcpy(array, _array, _size * sizeof(type));
}
-
+
void init(Array<type> *_array) {
array = (type *) ourmalloc(sizeof(type) * _array->size);
size = _array->size;
memcpy(array, _array->array, size * sizeof(type));
}
-
+
~Array() {
if (array)
ourfree(array);
}
-
+
type get(uint index) const {
return array[index];
}
-
+
void set(uint index, type item) {
array[index] = item;
}
-
+
uint length() const {
return size;
}
type *internalArray() {
return array;
}
-
- private:
+
+private:
type *array;
uint size;
};
Linknode<_Key> *next;
};
-template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function) (_Key), bool (*equals) (_Key, _Key)>
+template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key), bool (*equals)(_Key, _Key)>
class Hashset;
-template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function) (_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals) (_Key, _Key) = defaultEquals<_Key> >
+template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
class SetIterator {
public:
SetIterator(Linknode<_Key> *_curr, Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *_set) :
Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set;
};
-template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function) (_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals) (_Key, _Key) = defaultEquals<_Key> >
+template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
class Hashset {
public:
Hashset(unsigned int initialcapacity = 16, double factor = 0.5) :
* manipulation and storage.
* @tparam _Shift Logical shift to apply to all keys. Default 0.
*/
-template<typename _Key, typename _Val, typename _KeyInt, int _Shift = 0, unsigned int (*hash_function) (_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals) (_Key, _Key) = defaultEquals<_Key> >
+template<typename _Key, typename _Val, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
class Hashtable {
public:
/**
memcpy(array, _array, capacity * sizeof(type));
}
+ Vector(Vector<type> *v) :
+ size(v->size),
+ capacity(v->capacity),
+ array((type *) ourmalloc(sizeof(type) * v->capacity)) {
+ memcpy(array, v->array, capacity * sizeof(type));
+ }
+
void pop() {
size--;
}
size = _size;
}
- void push(type item) {
+ void addAll(Vector<type> *v) {
+ int oldsize = size;
+ setSize(size + v->size);
+ memcpy(&array[size], v->array, v->size * sizeof(type));
+ }
+
+ void add(type item) {
if (size >= capacity) {
uint newcap = capacity << 1;
array = (type *)ourrealloc(array, newcap * sizeof(type));