Block Chain Transactions, Commits multiple parts version
[iotcloud.git] / version2 / src / java / iotcloud / Transaction.java
index 90ef4d4fc385b269308cbc6d0716da9ece60ad96..82280c0fb38daab2c7340fb9bcb392ff62ed5879 100644 (file)
 package iotcloud;
 
-import java.nio.ByteBuffer;
+import java.util.Map;
 import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
+import java.nio.ByteBuffer;
 
-class Transaction extends Entry {
+class Transaction {
 
-    private long seqnum;
-    private long machineid;
-    private Set<KeyValue> keyValueUpdateSet = null;
+    private Map<Integer, TransactionPart> parts = null;
+    private Set<Integer> missingParts = null;
+    private List<Integer> partsPendingSend = null;
+    private boolean isComplete = false;
     private Set<KeyValue> keyValueGuardSet = null;
-    private Long arbitrator;
+    private Set<KeyValue> keyValueUpdateSet = null;
+    private boolean isDead = false;
+    private long sequenceNumber = -1;
+    private long clientLocalSequenceNumber = -1;
+    private long arbitratorId = -1;
+    private long machineId = -1;
+    private Pair<Long, Long> transactionId = null;
+
+    private int nextPartToSend = 0;
+    private boolean didSendAPartToServer = false;
+
+    private TransactionStatus transactionStatus = null;
+
+    public Transaction() {
+        parts = new HashMap<Integer, TransactionPart>();
+        keyValueGuardSet = new HashSet<KeyValue>();
+        keyValueUpdateSet = new HashSet<KeyValue>();
+        partsPendingSend = new ArrayList<Integer>();
+    }
+
+    public void addPartEncode(TransactionPart newPart) {
+        parts.put(newPart.getPartNumber(), newPart);
+        partsPendingSend.add(newPart.getPartNumber());
+
+        // Get the sequence number and other important information
+        sequenceNumber = newPart.getSequenceNumber();
+        arbitratorId = newPart.getArbitratorId();
+        transactionId = newPart.getTransactionId();
+        clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+        machineId = newPart.getMachineId();
+
+        isComplete = true;
+    }
+
+    public void addPartDecode(TransactionPart newPart) {
+
+        if (isDead) {
+            // If dead then just kill this part and move on
+            newPart.setDead();
+            return;
+        }
+
+        // Get the sequence number and other important information
+        sequenceNumber = newPart.getSequenceNumber();
+        arbitratorId = newPart.getArbitratorId();
+        transactionId = newPart.getTransactionId();
+        clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+        machineId = newPart.getMachineId();
+
+        TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+        if (previoslySeenPart != null) {
+            // Set dead the old one since the new one is a rescued version of this part
+            previoslySeenPart.setDead();
+        } else if (newPart.isLastPart()) {
+            missingParts = new HashSet<Integer>();
 
-    public Transaction(Slot slot, long _seqnum, long _machineid, Long _arbitrator, Set<KeyValue> _keyValueUpdateSet, Set<KeyValue> _keyValueGuardSet) {
-        super(slot);
-        seqnum = _seqnum;
-        machineid = _machineid;
-        arbitrator = _arbitrator;
-        // keyValueUpdateSet = new HashSet<KeyValue>();
-        // keyValueGuardSet = new HashSet<KeyValue>();
+            for (int i = 0; i < newPart.getPartNumber(); i++) {
+                if (parts.get(i) == null) {
+                    missingParts.add(i);
+                }
+            }
+        }
 
-        // for (KeyValue kv : _keyValueUpdateSet) {
-        //     KeyValue kvCopy = kv.getCopy();
-        //     keyValueUpdateSet.add(kvCopy);
-        // }
+        if (!isComplete) {
 
-        // for (KeyValue kv : _keyValueGuardSet) {
-        //     KeyValue kvCopy = kv.getCopy();
-        //     keyValueGuardSet.add(kvCopy);
-        // }
+            // We have seen this part so remove it from the set of missing parts
+            missingParts.remove(newPart.getPartNumber());
 
-        keyValueUpdateSet = _keyValueUpdateSet;
-        keyValueGuardSet = _keyValueGuardSet;
+            // Check if all the parts have been seen
+            if (missingParts.size() == 0) {
+
+                // We have all the parts
+                isComplete = true;
+
+                // Decode all the parts and create the key value guard and update sets
+                decodeTransactionData();
+            }
+        }
     }
 
-    public long getMachineID() {
-        return machineid;
+    public void addUpdateKV(KeyValue kv) {
+        keyValueUpdateSet.add(kv);
     }
 
-    public long getArbitrator() {
-        return arbitrator;
+    public void addGuardKV(KeyValue kv) {
+        keyValueGuardSet.add(kv);
     }
 
+
     public long getSequenceNumber() {
-        return seqnum;
+        return sequenceNumber;
     }
 
+    public void setSequenceNumber(long _sequenceNumber) {
+        sequenceNumber = _sequenceNumber;
 
-    public Set<KeyValue> getkeyValueUpdateSet() {
-        return keyValueUpdateSet;
+        for (Integer i : parts.keySet()) {
+            parts.get(i).setSequenceNumber(sequenceNumber);
+        }
     }
 
-    public Set<KeyValue> getkeyValueGuardSet() {
-        return keyValueGuardSet;
+    public long getClientLocalSequenceNumber() {
+        return clientLocalSequenceNumber;
     }
 
-    public boolean evaluateGuard(Map<IoTString, KeyValue> keyValTableCommitted, Map<IoTString, KeyValue> keyValTableSpeculative) {
-        for (KeyValue kvGuard : keyValueGuardSet) {
+    public Map<Integer, TransactionPart> getParts() {
+        return parts;
+    }
 
-            // First check if the key is in the speculative table, this is the value of the latest assumption
-            KeyValue kv = null;
 
-            // If we have a speculation table then use it first
-            if (keyValTableSpeculative != null) {
-                kv = keyValTableSpeculative.get(kvGuard.getKey());
-            }
-
-            if (kv == null) {
+    public boolean didSendAPartToServer() {
+        return didSendAPartToServer;
+    }
 
-                // if it is not in the speculative table then check the committed table and use that
-                // value as our latest assumption
-                kv = keyValTableCommitted.get(kvGuard.getKey());
-            }
+    public void resetNextPartToSend() {
+        nextPartToSend = 0;
+    }
 
-            if (kvGuard.getValue() != null) {
-                if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
-                    return false;
-                }
-            } else {
-                if (kv != null) {
-                    return false;
-                }
-            }
+    public TransactionPart getNextPartToSend() {
+        if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+            return null;
         }
-        return true;
+        TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+        nextPartToSend++;
+        return part;
+    }
+
+    public void setTransactionStatus(TransactionStatus _transactionStatus) {
+        transactionStatus = _transactionStatus;
     }
 
-    public byte getType() {
-        return Entry.TypeTransaction;
+    public TransactionStatus getTransactionStatus() {
+        return transactionStatus;
     }
 
-    public int getSize() {
-        int size = 3 * Long.BYTES + Byte.BYTES; // seq, machine id, entry type
-        size += Integer.BYTES; // number of KV's
-        size += Integer.BYTES; // number of Guard KV's
+    public void removeSentParts(List<Integer> sentParts) {
+        nextPartToSend = 0;
+        partsPendingSend.removeAll(sentParts);
+        didSendAPartToServer = true;
+        transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+    }
 
-        // Size of each KV
-        for (KeyValue kv : keyValueUpdateSet) {
-            size += kv.getSize();
-        }
+    public boolean didSendAllParts() {
+        return partsPendingSend.isEmpty();
+    }
 
-        // Size of each Guard KV
-        for (KeyValue kv : keyValueGuardSet) {
-            size += kv.getSize();
-        }
 
-        return size;
+    public Set<KeyValue> getKeyValueUpdateSet() {
+        return keyValueUpdateSet;
+    }
+
+    public int getNumberOfParts() {
+        return parts.size();
+    }
+
+    public long getMachineId() {
+        return machineId;
+    }
+
+    public long getArbitrator() {
+        return arbitratorId;
+    }
+
+    public boolean isComplete() {
+        return isComplete;
     }
 
-    public void encode(ByteBuffer bb) {
-        bb.put(Entry.TypeTransaction);
-        bb.putLong(seqnum);
-        bb.putLong(machineid);
-        bb.putLong(arbitrator);
+    public Pair<Long, Long> getId() {
+        return transactionId;
+    }
 
-        bb.putInt(keyValueUpdateSet.size());
-        for (KeyValue kv : keyValueUpdateSet) {
-            kv.encode(bb);
+    public void setDead() {
+        if (isDead) {
+            // Already dead
+            return;
         }
 
-        bb.putInt(keyValueGuardSet.size());
-        for (KeyValue kv : keyValueGuardSet) {
-            kv.encode(bb);
+        // Set dead
+        isDead = true;
+
+        // Make all the parts of this transaction dead
+        for (Integer partNumber : parts.keySet()) {
+            TransactionPart part = parts.get(partNumber);
+            part.setDead();
         }
     }
 
-    static Entry decode(Slot slot, ByteBuffer bb) {
-        long seqnum = bb.getLong();
-        long machineid = bb.getLong();
-        long arbitrator = bb.getLong();
-        int numberOfKeys = bb.getInt();
+    public TransactionPart getPart(int index) {
+        return parts.get(index);
+    }
+
+    private void decodeTransactionData() {
 
-        Set<KeyValue> kvSetUpdates = new HashSet<KeyValue>();
-        for (int i = 0; i < numberOfKeys; i++) {
-            KeyValue kv = KeyValue.decode(bb);
-            kvSetUpdates.add(kv);
+        // Calculate the size of the data section
+        int dataSize = 0;
+        for (int i = 0; i < parts.keySet().size(); i++) {
+            TransactionPart tp = parts.get(i);
+            dataSize += tp.getDataSize();
         }
 
-        int numberOfGuards = bb.getInt();
-        Set<KeyValue> kvSetGuards = new HashSet<KeyValue>();
-        for (int i = 0; i < numberOfGuards; i++) {
-            KeyValue kv = KeyValue.decode(bb);
-            kvSetGuards.add(kv);
+        byte[] combinedData = new byte[dataSize];
+        int currentPosition = 0;
+
+        // Stitch all the data sections together
+        for (int i = 0; i < parts.keySet().size(); i++) {
+            TransactionPart tp = parts.get(i);
+            System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+            currentPosition += tp.getDataSize();
         }
 
-        return new Transaction(slot, seqnum, machineid, arbitrator, kvSetUpdates, kvSetGuards);
+        // Decoder Object
+        ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+
+        // Decode how many key value pairs need to be decoded
+        int numberOfKVGuards = bbDecode.getInt();
+        int numberOfKVUpdates = bbDecode.getInt();
+
+        // Decode all the guard key values
+        for (int i = 0; i < numberOfKVGuards; i++) {
+            KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+            keyValueGuardSet.add(kv);
+        }
+
+        // Decode all the updates key values
+        for (int i = 0; i < numberOfKVUpdates; i++) {
+            KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+            keyValueUpdateSet.add(kv);
+        }
     }
 
-    public Entry getCopy(Slot s) {
-        return new Transaction(s, seqnum, machineid, arbitrator, keyValueUpdateSet, keyValueGuardSet);
+    public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
+        for (KeyValue kvGuard : keyValueGuardSet) {
+
+            // First check if the key is in the speculative table, this is the value of the latest assumption
+            KeyValue kv = null;
+
+            // If we have a speculation table then use it first
+            if (pendingTransactionSpeculatedKeyValueTable != null) {
+                kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+            }
+
+            // If we have a speculation table then use it first
+            if ((kv == null) && (speculatedKeyValueTable != null)) {
+                kv = speculatedKeyValueTable.get(kvGuard.getKey());
+            }
+
+            if (kv == null) {
+                // if it is not in the speculative table then check the committed table and use that
+                // value as our latest assumption
+                kv = committedKeyValueTable.get(kvGuard.getKey());
+            }
+
+            if (kvGuard.getValue() != null) {
+                if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
+                    return false;
+                }
+            } else {
+                if (kv != null) {
+                    return false;
+                }
+            }
+        }
+        return true;
     }
 }
\ No newline at end of file