--- /dev/null
+package iotcloud;
+
+import java.util.Set;
+import java.util.Map;
+import java.util.HashSet;
+
+import javax.script.ScriptException;
+import java.lang.NullPointerException;
+import java.nio.ByteBuffer;
+
+
+class PendingTransaction {
+
+ private Set<KeyValue> keyValueUpdateSet = null;
+ private Set<KeyValue> keyValueGuardSet = null;
+ private long arbitrator = -1;
+ private long clientLocalSequenceNumber = -1;
+ private long machineId = -1;
+
+ private int currentDataSize = 0;
+
+ public PendingTransaction(long _machineId) {
+ machineId = _machineId;
+ keyValueUpdateSet = new HashSet<KeyValue>();
+ keyValueGuardSet = new HashSet<KeyValue>();
+ }
+
+ /**
+ * Add a new key value to the updates
+ *
+ */
+ public void addKV(KeyValue newKV) {
+
+ KeyValue rmKV = null;
+
+ // Make sure there are no duplicates
+ for (KeyValue kv : keyValueUpdateSet) {
+ if (kv.getKey().equals(newKV.getKey())) {
+
+ // Remove key if we are adding a newer version of the same key
+ rmKV = kv;
+ break;
+ }
+ }
+
+ // Remove key if we are adding a newer version of the same key
+ if (rmKV != null) {
+ keyValueUpdateSet.remove(rmKV);
+ currentDataSize -= rmKV.getSize();
+ }
+
+ // Add the key to the hash set
+ keyValueUpdateSet.add(newKV);
+ currentDataSize += newKV.getSize();
+ }
+
+ /**
+ * Add a new key value to the guard set
+ *
+ */
+ public void addKVGuard(KeyValue newKV) {
+ // Add the key to the hash set
+ keyValueGuardSet.add(newKV);
+ currentDataSize += newKV.getSize();
+ }
+
+ /**
+ * Checks if the arbitrator is the same
+ */
+ public boolean checkArbitrator(long arb) {
+ if (arbitrator == -1) {
+ arbitrator = arb;
+ return true;
+ }
+
+ return arb == arbitrator;
+ }
+
+ /**
+ * Get the transaction arbitrator
+ */
+ public long getArbitrator() {
+ return arbitrator;
+ }
+
+ /**
+ * Get the key value update set
+ */
+ public Set<KeyValue> getKVUpdates() {
+ return keyValueUpdateSet;
+ }
+
+ /**
+ * Get the key value update set
+ */
+ public Set<KeyValue> getKVGuard() {
+ return keyValueGuardSet;
+ }
+
+ public void setClientLocalSequenceNumber(long _clientLocalSequenceNumber) {
+ clientLocalSequenceNumber = _clientLocalSequenceNumber;
+ }
+
+ public long getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
+ }
+
+ public long getMachineId() {
+ return machineId;
+ }
+
+ public boolean evaluateGuard(Map<IoTString, KeyValue> keyValTableCommitted, Map<IoTString, KeyValue> keyValTableSpeculative, Map<IoTString, KeyValue> keyValTablePendingTransSpeculative) {
+ for (KeyValue kvGuard : keyValueGuardSet) {
+
+ // First check if the key is in the speculative table, this is the value of the latest assumption
+ KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey());
+
+
+ if (kv == null) {
+ // if it is not in the pending trans table then check the speculative table and use that
+ // value as our latest assumption
+ kv = keyValTableSpeculative.get(kvGuard.getKey());
+ }
+
+
+ if (kv == null) {
+ // if it is not in the speculative table then check the committed table and use that
+ // value as our latest assumption
+ kv = keyValTableCommitted.get(kvGuard.getKey());
+ }
+
+ if (kvGuard.getValue() != null) {
+ if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
+ return false;
+ }
+ } else {
+ if (kv != null) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public Transaction createTransaction() {
+
+ Transaction newTransaction = new Transaction();
+ int transactionPartCount = 0;
+
+ // Convert all the data into a byte array so we can start partitioning
+ byte[] byteData = convertDataToBytes();
+
+ int currentPosition = 0;
+ int remaining = byteData.length;
+
+ while (remaining > 0) {
+
+ Boolean isLastPart = false;
+ // determine how much to copy
+ int copySize = TransactionPart.MAX_NON_HEADER_SIZE;
+ if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true; // last bit of data so last part
+ }
+
+ // Copy to a smaller version
+ byte[] partData = new byte[copySize];
+ System.arraycopy(byteData, currentPosition, partData, 0, copySize);
+
+ TransactionPart part = new TransactionPart(null, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
+ newTransaction.addPartEncode(part);
+
+ // Update position, count and remaining
+ currentPosition += copySize;
+ transactionPartCount++;
+ remaining -= copySize;
+ }
+
+ // Add the Guard Conditions
+ for (KeyValue kv : keyValueGuardSet) {
+ newTransaction.addGuardKV(kv);
+ }
+
+ // Add the updates
+ for (KeyValue kv : keyValueUpdateSet) {
+ newTransaction.addUpdateKV(kv);
+ }
+
+ return newTransaction;
+ }
+
+ private byte[] convertDataToBytes() {
+
+ // Calculate the size of the data
+ int sizeOfData = 2 * Integer.BYTES; // Number of Update KV's and Guard KV's
+ sizeOfData += currentDataSize;
+
+ // Data handlers and storage
+ byte[] dataArray = new byte[sizeOfData];
+ ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+
+ // Encode the size of the updates and guard sets
+ bbEncode.putInt(keyValueGuardSet.size());
+ bbEncode.putInt(keyValueUpdateSet.size());
+
+ // Encode all the guard conditions
+ for (KeyValue kv : keyValueGuardSet) {
+ kv.encode(bbEncode);
+ }
+
+ // Encode all the updates
+ for (KeyValue kv : keyValueUpdateSet) {
+ kv.encode(bbEncode);
+ }
+
+ return bbEncode.array();
+ }
+}
\ No newline at end of file