Fixed Rejected Messages, Calculating correct size
[iotcloud.git] / version2 / src / java / iotcloud / Commit.java
index 72249225b777c2ca4875b9df7d23cf0375a5b4ca..7084d0e7100abeba58281eb32d172f511fce0966 100644 (file)
 package iotcloud;
 
-import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.nio.ByteBuffer;
+
+class Commit {
+
+    private Map<Integer, CommitPart> parts = null;
+    private Set<Integer> missingParts = null;
+    private boolean isComplete = false;
+    private boolean hasLastPart = false;
+    private Set<KeyValue> keyValueUpdateSet = null;
+    private boolean isDead = false;
+    private long sequenceNumber = -1;
+    private long machineId = -1;
+    private long transactionSequenceNumber = -1;
+
+    private Set<IoTString> liveKeys = null;
+
+    public Commit() {
+        parts = new HashMap<Integer, CommitPart>();
+        keyValueUpdateSet = new HashSet<KeyValue>();
+
+        liveKeys = new HashSet<IoTString>();
+    }
+
+    public Commit(long _sequenceNumber, long _machineId, long _transactionSequenceNumber) {
+        parts = new HashMap<Integer, CommitPart>();
+        keyValueUpdateSet = new HashSet<KeyValue>();
+
+        liveKeys = new HashSet<IoTString>();
+
+        sequenceNumber = _sequenceNumber;
+        machineId = _machineId;
+        transactionSequenceNumber = _transactionSequenceNumber;
+        isComplete = true;
+    }
+
+
+    public void addPartDecode(CommitPart newPart) {
+
+        if (isDead) {
+            // If dead then just kill this part and move on
+            newPart.setDead();
+            return;
+        }
+
+        CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+        if (previoslySeenPart != null) {
+            // Set dead the old one since the new one is a rescued version of this part
+            previoslySeenPart.setDead();
+        } else if (newPart.isLastPart()) {
+            missingParts = new HashSet<Integer>();
+            hasLastPart = true;
+
+            for (int i = 0; i < newPart.getPartNumber(); i++) {
+                if (parts.get(i) == null) {
+                    missingParts.add(i);
+                }
+            }
+        }
+
+        if (!isComplete && hasLastPart) {
+
+            // We have seen this part so remove it from the set of missing parts
+            missingParts.remove(newPart.getPartNumber());
+
+            // Check if all the parts have been seen
+            if (missingParts.size() == 0) {
+
+                // We have all the parts
+                isComplete = true;
+
+                // Decode all the parts and create the key value guard and update sets
+                decodeCommitData();
+
+                // Get the sequence number and arbitrator of this transaction
+                sequenceNumber = parts.get(0).getSequenceNumber();
+                machineId = parts.get(0).getMachineId();
+                transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+            }
+        }
+    }
+
+    public long getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public long getTransactionSequenceNumber() {
+        return transactionSequenceNumber;
+    }
+
+    public Map<Integer, CommitPart> getParts() {
+        return parts;
+    }
+
+    public void addKV(KeyValue kv) {
+        keyValueUpdateSet.add(kv);
+        liveKeys.add(kv.getKey());
+    }
+
+    public void invalidateKey(IoTString key) {
+        liveKeys.remove(key);
+
+        if (liveKeys.size() == 0) {
+            setDead();
+        }
+    }
+
+    public Set<KeyValue> getKeyValueUpdateSet() {
+        return keyValueUpdateSet;
+    }
+
+    public int getNumberOfParts() {
+        return parts.size();
+    }
+
+    public long getMachineId() {
+        return machineId;
+    }
+
+    public boolean isComplete() {
+        return isComplete;
+    }
+
+    public boolean isLive() {
+        return !isDead;
+    }
+
+    public void setDead() {
+        if (isDead) {
+            // Already dead
+            return;
+        }
+
+        // Set dead
+        isDead = true;
+
+        // Make all the parts of this transaction dead
+        for (Integer partNumber : parts.keySet()) {
+            CommitPart part = parts.get(partNumber);
+            part.setDead();
+        }
+    }
+
+    public CommitPart getPart(int index) {
+        return parts.get(index);
+    }
+
+    public void createCommitParts() {
+
+        parts.clear();
+
+        // Convert to bytes
+        byte[] byteData = convertDataToBytes();
+
+
+        int commitPartCount = 0;
+        int currentPosition = 0;
+        int remaining = byteData.length;
+
+        while (remaining > 0) {
+
+            Boolean isLastPart = false;
+            // determine how much to copy
+            int copySize = CommitPart.MAX_NON_HEADER_SIZE;
+            if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
+                copySize = remaining;
+                isLastPart = true; // last bit of data so last part
+            }
 
-/**
- * This Entry records the commit of a transaction.
- * @author Ali Younis <ayounis@uci.edu>
- * @version 1.0
- */
+            // Copy to a smaller version
+            byte[] partData = new byte[copySize];
+            System.arraycopy(byteData, currentPosition, partData, 0, copySize);
 
+            CommitPart part = new CommitPart(null, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+            parts.put(part.getPartNumber(), part);
 
-class Commit extends Entry {
-       private long seqnumtrans;
-       private long transarbitrator;
+            // Update position, count and remaining
+            currentPosition += copySize;
+            commitPartCount++;
+            remaining -= copySize;
+        }
+    }
 
-       private Set<KeyValue> keyValueUpdateSet = null;
+    private void decodeCommitData() {
 
+        // Calculate the size of the data section
+        int dataSize = 0;
+        for (int i = 0; i < parts.keySet().size(); i++) {
+            CommitPart tp = parts.get(i);
+            dataSize += tp.getDataSize();
+        }
 
-       public Commit(Slot slot, long _seqnumtrans, long _transarbitrator, Set<KeyValue> _keyValueUpdateSet) {
-               super(slot);
-               seqnumtrans = _seqnumtrans;
-               transarbitrator = _transarbitrator;
+        byte[] combinedData = new byte[dataSize];
+        int currentPosition = 0;
 
-               keyValueUpdateSet = new HashSet<KeyValue>();
+        // Stitch all the data sections together
+        for (int i = 0; i < parts.keySet().size(); i++) {
+            CommitPart tp = parts.get(i);
+            System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+            currentPosition += tp.getDataSize();
+        }
 
-               for (KeyValue kv : _keyValueUpdateSet) {
-                       KeyValue kvCopy = kv.getCopy();
-                       keyValueUpdateSet.add(kvCopy);
-               }
-       }
+        // Decoder Object
+        ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
 
-       public long getTransSequenceNumber() {
-               return seqnumtrans;
-       }
+        // Decode how many key value pairs need to be decoded
+        int numberOfKVUpdates = bbDecode.getInt();
 
-       public long getTransArbitrator() {
-               return transarbitrator;
-       }
+        // Decode all the updates key values
+        for (int i = 0; i < numberOfKVUpdates; i++) {
+            KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+            keyValueUpdateSet.add(kv);
+            liveKeys.add(kv.getKey());
+        }
+    }
 
-       public Set<KeyValue> getkeyValueUpdateSet() {
-               return keyValueUpdateSet;
-       }
+    private byte[] convertDataToBytes() {
 
-       public byte getType() {
-               return Entry.TypeCommit;
-       }
+        // Calculate the size of the data
+        int sizeOfData = Integer.BYTES; // Number of Update KV's
+        for (KeyValue kv : keyValueUpdateSet) {
+            sizeOfData += kv.getSize();
+        }
 
-       public int getSize() {
-               int size = 2 * Long.BYTES + Byte.BYTES; // seq id, entry type
-               size += Integer.BYTES; // number of KV's
+        // Data handlers and storage
+        byte[] dataArray = new byte[sizeOfData];
+        ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
 
-               // Size of each KV
-               for (KeyValue kv : keyValueUpdateSet) {
-                       size += kv.getSize();
-               }
+        // Encode the size of the updates and guard sets
+        bbEncode.putInt(keyValueUpdateSet.size());
 
-               return size;
-       }
+        // Encode all the updates
+        for (KeyValue kv : keyValueUpdateSet) {
+            kv.encode(bbEncode);
+        }
 
-       static Entry decode(Slot slot, ByteBuffer bb) {
-               long seqnumtrans = bb.getLong();
-               long transarbitrator = bb.getLong();
-               int numberOfKeys = bb.getInt();
+        return bbEncode.array();
+    }
 
-               Set<KeyValue> kvSet = new HashSet<KeyValue>();
-               for (int i = 0; i < numberOfKeys; i++) {
-                       KeyValue kv = KeyValue.decode(bb);
-                       kvSet.add(kv);
-               }
+    private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
+        keyValueUpdateSet.clear();
+        liveKeys.clear();
 
-               return new Commit(slot, seqnumtrans, transarbitrator, kvSet);
-       }
+        keyValueUpdateSet.addAll(newKVs.values());
+        liveKeys.addAll(newKVs.keySet());
 
-       public void encode(ByteBuffer bb) {
-               bb.put(Entry.TypeCommit);
-               bb.putLong(seqnumtrans);
-               bb.putLong(transarbitrator);
+    }
 
-               bb.putInt(keyValueUpdateSet.size());
 
-               for (KeyValue kv : keyValueUpdateSet) {
-                       kv.encode(bb);
-               }
-       }
+    public static Commit merge(Commit newer, Commit older, long newSequenceNumber) {
 
-       public Entry getCopy(Slot s) {
-               // System.out.println("Commit Rescued:  " + this);  // TODO remove
-               return new Commit(s, seqnumtrans, transarbitrator, keyValueUpdateSet);
-       }
+        if (older == null) {
+            return newer;
+        } else if (newer == null) {
+            return older;
+        }
 
-       public Set<KeyValue> updateLiveKeys(Set<KeyValue> kvSet) {
+        Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
+        for (KeyValue kv : older.getKeyValueUpdateSet()) {
+            kvSet.put(kv.getKey(), kv);
+        }
 
-               if (!this.isLive())
-                       return new HashSet<KeyValue>();
+        for (KeyValue kv : newer.getKeyValueUpdateSet()) {
+            kvSet.put(kv.getKey(), kv);
+        }
 
-               Set<KeyValue> toDelete = new HashSet<KeyValue>();
+        long transactionSequenceNumber = newer.getTransactionSequenceNumber();
 
-               for (KeyValue kv1 : kvSet) {
-                       for (Iterator<KeyValue> i = keyValueUpdateSet.iterator(); i.hasNext();) {
-                               KeyValue kv2 = i.next();
+        if (transactionSequenceNumber == -1) {
+            transactionSequenceNumber = older.getTransactionSequenceNumber();
+        }
 
-                               if (kv1.getKey().equals(kv2.getKey())) {
-                                       // keyValueUpdateSet.remove(kv2);
-                                       toDelete.add(kv2);
-                                       i.remove();
-                                       break;
-                               }
-                       }
-               }
+        Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
 
-               if (keyValueUpdateSet.size() == 0) {
-                       // System.out.println("Killed Commit:  " + this); // TODO remove
-                       this.setDead();
-               }
+        newCommit.setKVsMap(kvSet);
 
-               return toDelete;
-       }
+        return newCommit;
+    }
 }
\ No newline at end of file