+#include "PendingTransaction.h"
+
+PendingTransaction::PendingTransaction(int64_t _machineId) :
+ keyValueUpdateSet(new Hashset<KeyValue*>()),
+ keyValueGuardSet(new HashSet<KeyValue*>()),
+ arbitrator(-1),
+ clientLocalSequenceNumber(-1),
+ machineId(_machineId),
+ currentDataSize(0) {
+}
+/**
+ * Add a new key value to the updates
+ *
+ */
+void PendingTransaction::addKV(KeyValue * newKV) {
+
+ KeyValue rmKV = NULL;
+
+ // Make sure there are no duplicates
+ for (KeyValue kv : keyValueUpdateSet) {
+ if (kv.getKey().equals(newKV.getKey())) {
+
+ // Remove key if we are adding a newer version of the same key
+ rmKV = kv;
+ break;
+ }
+ }
+
+ // Remove key if we are adding a newer version of the same key
+ if (rmKV != NULL) {
+ keyValueUpdateSet.remove(rmKV);
+ currentDataSize -= rmKV.getSize();
+ }
+
+ // Add the key to the hash set
+ keyValueUpdateSet.add(newKV);
+ currentDataSize += newKV.getSize();
+}
+/**
+ * Add a new key value to the guard set
+ *
+ */
+void PendingTransaction::addKVGuard(KeyValue * newKV) {
+ // Add the key to the hash set
+ keyValueGuardSet.add(newKV);
+ currentDataSize += newKV.getSize();
+}
+/**
+ * Checks if the arbitrator is the same
+ */
+bool PendingTransaction::checkArbitrator(int64_t arb) {
+ if (arbitrator == -1) {
+ arbitrator = arb;
+ return true;
+ }
+
+ return arb == arbitrator;
+}
-class PendingTransaction {
-
- Set<KeyValue> keyValueUpdateSet = NULL;
- Set<KeyValue> keyValueGuardSet = NULL;
- int64_t arbitrator = -1;
- int64_t clientLocalSequenceNumber = -1;
- int64_t machineId = -1;
-
- int currentDataSize = 0;
-
- PendingTransaction(int64_t _machineId) {
- machineId = _machineId;
- keyValueUpdateSet = new HashSet<KeyValue>();
- keyValueGuardSet = new HashSet<KeyValue>();
- }
-
- /**
- * Add a new key value to the updates
- *
- */
- void addKV(KeyValue newKV) {
-
- KeyValue rmKV = NULL;
-
- // Make sure there are no duplicates
- for (KeyValue kv : keyValueUpdateSet) {
- if (kv.getKey().equals(newKV.getKey())) {
-
- // Remove key if we are adding a newer version of the same key
- rmKV = kv;
- break;
- }
- }
-
- // Remove key if we are adding a newer version of the same key
- if (rmKV != NULL) {
- keyValueUpdateSet.remove(rmKV);
- currentDataSize -= rmKV.getSize();
- }
-
- // Add the key to the hash set
- keyValueUpdateSet.add(newKV);
- currentDataSize += newKV.getSize();
- }
-
- /**
- * Add a new key value to the guard set
- *
- */
- void addKVGuard(KeyValue newKV) {
- // Add the key to the hash set
- keyValueGuardSet.add(newKV);
- currentDataSize += newKV.getSize();
- }
-
- /**
- * Checks if the arbitrator is the same
- */
- bool checkArbitrator(int64_t arb) {
- if (arbitrator == -1) {
- arbitrator = arb;
- return true;
- }
-
- return arb == arbitrator;
- }
-
- /**
- * Get the transaction arbitrator
- */
- int64_t getArbitrator() {
- return arbitrator;
- }
-
- /**
- * Get the key value update set
- */
- Set<KeyValue> getKVUpdates() {
- return keyValueUpdateSet;
- }
-
- /**
- * Get the key value update set
- */
- Set<KeyValue> getKVGuard() {
- return keyValueGuardSet;
- }
-
- void setClientLocalSequenceNumber(int64_t _clientLocalSequenceNumber) {
- clientLocalSequenceNumber = _clientLocalSequenceNumber;
- }
-
- int64_t getClientLocalSequenceNumber() {
- return clientLocalSequenceNumber;
- }
-
- int64_t getMachineId() {
- return machineId;
- }
-
- bool evaluateGuard(Map<IoTString, KeyValue> keyValTableCommitted, Map<IoTString, KeyValue> keyValTableSpeculative, Map<IoTString, KeyValue> keyValTablePendingTransSpeculative) {
- for (KeyValue kvGuard : keyValueGuardSet) {
-
- // First check if the key is in the speculative table, this is the value of the latest assumption
- KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey());
-
-
- if (kv == NULL) {
- // if it is not in the pending trans table then check the speculative table and use that
- // value as our latest assumption
- kv = keyValTableSpeculative.get(kvGuard.getKey());
- }
-
-
- if (kv == NULL) {
- // if it is not in the speculative table then check the committed table and use that
- // value as our latest assumption
- kv = keyValTableCommitted.get(kvGuard.getKey());
- }
-
- if (kvGuard.getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
- return false;
- }
- } else {
- if (kv != NULL) {
- return false;
- }
- }
+bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> keyValTableCommitted, Hashtable<IoTString *, KeyValue *> keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> keyValTablePendingTransSpeculative) {
+ for (KeyValue kvGuard : keyValueGuardSet) {
+
+ // First check if the key is in the speculative table, this is the value of the latest assumption
+ KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey());
+
+
+ if (kv == NULL) {
+ // if it is not in the pending trans table then check the speculative table and use that
+ // value as our latest assumption
+ kv = keyValTableSpeculative.get(kvGuard.getKey());
+ }
+
+
+ if (kv == NULL) {
+ // if it is not in the speculative table then check the committed table and use that
+ // value as our latest assumption
+ kv = keyValTableCommitted.get(kvGuard.getKey());
+ }
+
+ if (kvGuard.getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+ return false;
+ }
+ } else {
+ if (kv != NULL) {
+ return false;
+ }
+ }
}
- return true;
- }
-
- Transaction createTransaction() {
-
- Transaction newTransaction = new Transaction();
- int transactionPartCount = 0;
-
- // Convert all the data into a char array so we can start partitioning
- char[] charData = convertDataToBytes();
-
- int currentPosition = 0;
- int remaining = charData.length;
-
- while (remaining > 0) {
-
- Boolean isLastPart = false;
- // determine how much to copy
- int copySize = TransactionPart.MAX_NON_HEADER_SIZE;
- if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) {
- copySize = remaining;
- isLastPart = true; // last bit of data so last part
- }
-
- // Copy to a smaller version
- char[] partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
-
- TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
- newTransaction.addPartEncode(part);
+ return true;
+}
+Transaction * PendingTransaction::createTransaction() {
+ Transaction * newTransaction = new Transaction();
+ int transactionPartCount = 0;
+
+ // Convert all the data into a char array so we can start partitioning
+ Array<char> * charData = convertDataToBytes();
+
+ int currentPosition = 0;
+ int remaining = charData.length;
+
+ while (remaining > 0) {
+
+ Boolean isLastPart = false;
+ // determine how much to copy
+ int copySize = TransactionPart.MAX_NON_HEADER_SIZE;
+ if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true; // last bit of data so last part
+ }
+
+ // Copy to a smaller version
+ char[] partData = new char[copySize];
+ System.arraycopy(charData, currentPosition, partData, 0, copySize);
+
+ TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
+ newTransaction.addPartEncode(part);
+
// Update position, count and remaining
- currentPosition += copySize;
- transactionPartCount++;
- remaining -= copySize;
- }
-
- // Add the Guard Conditions
- for (KeyValue kv : keyValueGuardSet) {
- newTransaction.addGuardKV(kv);
- }
-
- // Add the updates
- for (KeyValue kv : keyValueUpdateSet) {
- newTransaction.addUpdateKV(kv);
- }
-
- return newTransaction;
- }
-
- char[] convertDataToBytes() {
-
- // Calculate the size of the data
- int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
- sizeOfData += currentDataSize;
-
- // Data handlers and storage
- char[] dataArray = new char[sizeOfData];
- ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
-
- // Encode the size of the updates and guard sets
- bbEncode.putInt(keyValueGuardSet.size());
- bbEncode.putInt(keyValueUpdateSet.size());
-
- // Encode all the guard conditions
- for (KeyValue kv : keyValueGuardSet) {
- kv.encode(bbEncode);
- }
-
- // Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bbEncode);
- }
+ currentPosition += copySize;
+ transactionPartCount++;
+ remaining -= copySize;
+ }
+
+ // Add the Guard Conditions
+ for (KeyValue kv : keyValueGuardSet) {
+ newTransaction.addGuardKV(kv);
+ }
+
+ // Add the updates
+ for (KeyValue kv : keyValueUpdateSet) {
+ newTransaction.addUpdateKV(kv);
+ }
+
+ return newTransaction;
+}
- return bbEncode.array();
- }
+Arrar<char> * PendingTransaction::convertDataToBytes() {
+ // Calculate the size of the data
+ int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
+ sizeOfData += currentDataSize;
+
+ // Data handlers and storage
+ Array<char> * dataArray = new Array<char>(sizeOfData);
+ ByteBuffer * bbEncode = ByteBuffer_wrap(dataArray);
+
+ // Encode the size of the updates and guard sets
+ bbEncode->putInt(keyValueGuardSet.size());
+ bbEncode->putInt(keyValueUpdateSet.size());
+
+ // Encode all the guard conditions
+ for (KeyValue kv : keyValueGuardSet) {
+ kv->encode(bbEncode);
+ }
+
+ // Encode all the updates
+ for (KeyValue kv : keyValueUpdateSet) {
+ kv->encode(bbEncode);
+ }
+
+ return bbEncode->array();
}
+