Tex typo issues, Code
authorAli Younis <ayounis@uci.edu>
Tue, 22 Nov 2016 23:52:01 +0000 (15:52 -0800)
committerAli Younis <ayounis@uci.edu>
Tue, 22 Nov 2016 23:52:01 +0000 (15:52 -0800)
doc2/iotcloud.tex
src2/java/iotcloud/Abort.java
src2/java/iotcloud/Commit.java
src2/java/iotcloud/Guard.java
src2/java/iotcloud/KeyValue.java
src2/java/iotcloud/Table.java
src2/java/iotcloud/Transaction.java

index 31612024664b7701ebde6f18c2243d3bf40a1e6d..e3ae679335db5eca99b30253d73daf8d68a3c54b 100644 (file)
@@ -1499,12 +1499,12 @@ This rescue is not mandatory.  This is trying to fill the remaining portion of t
         \EndIf\\\r
     \r
         \If{$\lnot$\Call{EvaluateGuard}{$Guard_t, CurrKV$}}\r
-            \State $abortde \gets $\Call{CreateAbort}{$seq_t, id_t$}\r
+            \State $abort_{de} \gets $\Call{CreateAbort}{$seq_t, id_t$}\r
             \LeftComment{No more space so we cant arbitrate any further}\r
-            \If($lnot$\Call{DeHasSpace}{$DE_a, abortde$})\r
+            \If($\lnot$\Call{DeHasSpace}{$DE_a, abort_{de}$})\r
                 \State \Return{$DE_a$}\r
             \EndIf\r
-            \State $DE_a \gets DE_a \cup abortde$\r
+            \State $DE_a \gets DE_a \cup abort_{de}$\r
         \Else\r
             \State $DKV \gets \{\tuple{k,v}| \tuple{k,v} \in KV \land \tuple{k',v'}\in KV_t \land k'=k\}$\r
             \State $KVTmp \gets (KV \setminus DKV) \cup KV'$\r
index 8ec1871c2694ba9ffa3b4b193adbf7d09fde2ca3..c2b57b7c04654f762046ac470428d53c80c7cd39 100644 (file)
@@ -10,12 +10,12 @@ import java.nio.ByteBuffer;
 
 
 class Abort extends Entry {
-       private long seqnum;
+       private long seqnumtrans;
        private long machineid;
 
-       Abort(Slot slot, long _seqnum, long _machineid) {
+       Abort(Slot slot, long _seqnumtrans, long _machineid) {
                super(slot);
-               seqnum=_seqnum;
+               seqnumtrans=_seqnumtrans;
                machineid=_machineid;
        }
 
@@ -23,19 +23,19 @@ class Abort extends Entry {
                return machineid;
        }
 
-       long getSequenceNumber() {
-               return seqnum;
+       long getTransSequenceNumber() {
+               return seqnumtrans;
        }
 
        static Entry decode(Slot slot, ByteBuffer bb) {
-               long seqnum=bb.getLong();
+               long seqnumtrans=bb.getLong();
                long machineid=bb.getLong();
-               return new Abort(slot, seqnum, machineid);
+               return new Abort(slot, seqnumtrans, machineid);
        }
 
        void encode(ByteBuffer bb) {
                bb.put(Entry.TypeAbort);
-               bb.putLong(seqnum);
+               bb.putLong(seqnumtrans);
                bb.putLong(machineid);
        }
 
@@ -48,6 +48,6 @@ class Abort extends Entry {
        }
 
        Entry getCopy(Slot s) {
-               return new Abort(s, seqnum, machineid);
+               return new Abort(s, seqnumtrans, machineid);
        }
 }
\ No newline at end of file
index 05834f0770d7e95b4c620f16b1c13079c0c3a4e8..c533c36bda67edd4cfc37044fa379f32943c7476 100644 (file)
@@ -3,54 +3,103 @@ package iotcloud;
 import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.Iterator;
 
 /**
- * This Entry records the abort sent by a given machine.
+ * This Entry records the commit of a transaction.
  * @author Ali Younis <ayounis@uci.edu>
  * @version 1.0
  */
 
 
 class Commit extends Entry {
-       private long seqnum;
-       private Set<KeyValue> keyValueUpdateSet;
+       private long seqnumtrans;
+       private Set<KeyValue> keyValueUpdateSet = null;
+       private Set<KeyValue> liveValues = null;
 
 
-       public Commit(Slot slot, long _seqnum, long _machineid) {
+       public Commit(Slot slot, long _seqnumtrans, Set<KeyValue> _keyValueUpdateSet) {
                super(slot);
-               seqnum=_seqnum;
-               machineid=_machineid;
+               seqnumtrans = _seqnumtrans;
+
+               keyValueUpdateSet = new HashSet<KeyValue>();
+               liveValues = new HashSet<KeyValue>();
+
+               for (KeyValue kv : _keyValueUpdateSet) {
+                       KeyValue kvCopy = kv.getCopy();
+                       keyValueUpdateSet.add(kvCopy);
+                       liveValues.add(kvCopy);
+               }
        }
 
-       public long getSequenceNumber() {
-               return seqnum;
+       public long getTransSequenceNumber() {
+               return seqnumtrans;
        }
 
+       public Set<KeyValue> getkeyValueUpdateSet() {
+               return keyValueUpdateSet;
+       }
+
+       public byte getType() {
+               return Entry.TypeCommit;
+       }
 
+       public int getSize() {
+               int size = Long.BYTES + Byte.BYTES; // seq id, entry type
+               size += Integer.BYTES; // number of KV's
 
+               // Size of each KV
+               for (KeyValue kv : keyValueUpdateSet) {
+                       size += kv.getSize();
+               }
 
+               return size;
+       }
 
        static Entry decode(Slot slot, ByteBuffer bb) {
-               long seqnum=bb.getLong();
-               long machineid=bb.getLong();
-               return new Abort(slot, seqnum, machineid);
+               long seqnumtrans = bb.getLong();
+               int numberOfKeys = bb.getInt();
+
+               Set<KeyValue> kvSet = new HashSet<KeyValue>();
+               for (int i = 0; i < numberOfKeys; i++) {
+                       KeyValue kv = KeyValue.decode(bb);
+                       kvSet.add(kv);
+               }
+
+               return new Commit(slot, seqnumtrans, kvSet);
        }
 
        public void encode(ByteBuffer bb) {
-               bb.put(Entry.TypeAbort);
-               bb.putLong(seqnum);
-               bb.putLong(machineid);
-       }
+               bb.put(Entry.TypeCommit);
+               bb.putLong(seqnumtrans);
+               bb.putInt(keyValueUpdateSet.size());
 
-       public int getSize() {
-               return 2*Long.BYTES+Byte.BYTES;
+               for (KeyValue kv : keyValueUpdateSet) {
+                       kv.encode(bb);
+               }
        }
 
-       public byte getType() {
-               return Entry.TypeAbort;
+       public Entry getCopy(Slot s) {
+               return new Commit(s, seqnumtrans, keyValueUpdateSet);
        }
 
-       public Entry getCopy(Slot s) {
-               return new Abort(s, seqnum, machineid);
+       public void updateLiveKeys(Set<KeyValue> kvSet) {
+
+               if (!this.isLive())
+                       return;
+
+               for (KeyValue kv1 : kvSet) {
+                       for (Iterator<KeyValue> i = liveValues.iterator(); i.hasNext();) {
+                               KeyValue kv2 = i.next();
+
+                               if (kv1.getKey() == kv2.getKey()) {
+                                       liveValues.remove(kv2);
+                                       break;
+                               }
+                       }
+               }
+
+               if (liveValues.size() == 0)
+                       this.setDead();
        }
 }
\ No newline at end of file
index aaf312b9757ca060f9fa7d28b43031b273c87208..2f755cf04158bd1dc11631ee188009dc06b88c3a 100644 (file)
@@ -98,4 +98,9 @@ class Guard {
         bb.get(expr);
         return new Guard(IoTString.shallow(expr));
     }
+
+    public Guard getCopy() {
+        return new Guard(IoTString.shallow(booleanExpression.internalBytes()));
+    }
+
 }
\ No newline at end of file
index 9fbaf90babf9f20a485417a33f69aa45f0244352..79b9e60bd84c62a93d628a341407290b0cd1c162 100644 (file)
@@ -7,13 +7,13 @@ import java.nio.ByteBuffer;
  * @version 1.0
  */
 
-class KeyValue /*extends Entry */ {
+class KeyValue { /*extends Entry */
        private IoTString key;
        private IoTString value;
 
        public KeyValue(IoTString _key, IoTString _value) {
-               key=_key;
-               value=_value;
+               key = _key;
+               value = _value;
        }
 
        public IoTString getKey() {
@@ -25,10 +25,10 @@ class KeyValue /*extends Entry */ {
        }
 
        static KeyValue decode(ByteBuffer bb) {
-               int keylength=bb.getInt();
-               int valuelength=bb.getInt();
-               byte[] key=new byte[keylength];
-               byte[] value=new byte[valuelength];
+               int keylength = bb.getInt();
+               int valuelength = bb.getInt();
+               byte[] key = new byte[keylength];
+               byte[] value = new byte[valuelength];
                bb.get(key);
                bb.get(value);
                return new KeyValue(IoTString.shallow(key), IoTString.shallow(value));
@@ -42,10 +42,14 @@ class KeyValue /*extends Entry */ {
        }
 
        public int getSize() {
-               return 2*Integer.BYTES+key.length()+value.length();
+               return 2 * Integer.BYTES + key.length() + value.length();
        }
 
        public String toString() {
                return value.toString();
        }
+
+       public KeyValue getCopy() {
+               return new KeyValue(key, value);
+       }
 }
index 7009ed9307aa4cd8a5a2f2756acb34d17a29ff9b..732867fc4cd7bc0f49286f202cd5b43f9b31f659 100644 (file)
@@ -8,6 +8,8 @@ import java.util.Vector;
 import java.util.Random;
 import java.util.Queue;
 import java.util.LinkedList;
+import java.util.List;
+
 /**
  * IoTTable data structure.  Provides client inferface.
  * @author Brian Demsky
@@ -43,6 +45,11 @@ final public class Table {
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+       private List<Commit> commitList = null; // List of all the most recent live commits
+       private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       private List<Transaction> uncommittedTransactionsList = null; //
+       private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+       private Set<Abort> arbitratorTable = null; // Table of arbitrators
 
 
        public Table(String baseurl, String password, long _localmachineid) {
@@ -55,6 +62,10 @@ final public class Table {
                lastliveslotseqn = 1;
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
+               commitList = new LinkedList<Commit>();
+               commitedTable = new HashMap<IoTString, KeyValue>();
+               uncommittedTransactionsList = new LinkedList<Transaction>();
+               arbitratorTable = new HashMap<IoTString, Long>();
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
@@ -66,6 +77,10 @@ final public class Table {
                cloud = _cloud;
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
+               commitList = new LinkedList<Commit>();
+               commitedTable = new HashMap<IoTString, KeyValue>();
+               uncommittedTransactionsList = new LinkedList<Transaction>();
+               arbitratorTable = new HashMap<IoTString, Long>();
        }
 
        public void rebuild() {
@@ -133,11 +148,6 @@ final public class Table {
                pendingTransBuild.addKV(kv);
        }
 
-
-
-
-
-
        void decrementLiveCount() {
                liveslotcount--;
        }
@@ -239,7 +249,7 @@ final public class Table {
                boolean insertedTrans = false;
                if (s.hasSpace(trans)) {
                        s.addEntry(trans);
-                       insertedTrans=true;
+                       insertedTrans = true;
                }
 
                /* now go through live entries from least to greatest sequence number until
@@ -356,16 +366,7 @@ final public class Table {
                setResizeThreshold();
        }
 
-       // private void processEntry(KeyValue entry, SlotIndexer indexer) {
-       //      IoTString key=entry.getKey();
-       //      KeyValue oldvalue=table.get(key);
-       //      if (oldvalue != null) {
-       //              oldvalue.setDead();
-       //      }
-       //      table.put(key, entry);
-       // }
-
-       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+       private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
 
@@ -404,6 +405,57 @@ final public class Table {
                        entry.setWatchSet(watchset);
        }
 
+       private void processEntry(NewKey entry) {
+               arbitratorTable.put(entry.getKey(), entry.getMachineID());
+       }
+
+       private void processEntry(Transaction entry) {
+               uncommittedTransactionsList.add(entry);
+       }
+
+       private void processEntry(Abort entry) {
+               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next();
+                       if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
+                               uncommittedTransactionsList.remove(prevtrans);
+                               prevtrans.setDead();
+                               return;
+                       }
+               }
+       }
+
+       private void processEntry(Commit entry) {
+
+               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+                       Commit prevcommit = i.next();
+                       prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                       if (!prevcommit.isLive()) {
+                               commitList.remove(prevcommit);
+                       }
+               }
+
+               commitList.add(entry);
+
+               // Update the committed table list
+               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                       IoTString key = kv.getKey();
+                       commitedTable.put(key, kv);
+               }
+
+               long committedTransSeq = entry.getTransSequenceNumber();
+
+               // Make dead the transactions
+               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next();
+
+                       if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+                               uncommittedTransactionsList.remove(prevtrans);
+                               prevtrans.setDead();
+                       }
+               }
+       }
+
        private void addWatchList(long machineid, RejectedMessage entry) {
                HashSet<RejectedMessage> entries = watchlist.get(machineid);
                if (entries == null)
@@ -411,7 +463,7 @@ final public class Table {
                entries.add(entry);
        }
 
-       private void processEntry(TableStatus entry, SlotIndexer indexer) {
+       private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
                updateCurrMaxSize(newnumslots);
                if (lastTableStatus != null)
@@ -476,12 +528,25 @@ final public class Table {
                updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
                for (Entry entry : slot.getEntries()) {
                        switch (entry.getType()) {
-                       // case Entry.TypeKeyValue:
-                       //      processEntry((KeyValue)entry, indexer);
-                       //      break;
+
+                       case Entry.TypeNewKey:
+                               processEntry((NewKey)entry);
+                               break;
+
+                       case Entry.TypeCommit:
+                               processEntry((Commit)entry);
+                               break;
+
+                       case Entry.TypeAbort:
+                               processEntry((Abort)entry);
+                               break;
+
+                       case Entry.TypeTransaction:
+                               processEntry((Transaction)entry);
+                               break;
 
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer, machineSet);
+                               processEntry((LastMessage)entry, machineSet);
                                break;
 
                        case Entry.TypeRejectedMessage:
@@ -489,7 +554,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry, indexer);
+                               processEntry((TableStatus)entry);
                                break;
 
                        default:
index d1a993389bf6ac4ef63f2079ff084bb1d7e0f25b..d25403efab5a8b1935433cb6010b322f0267c0fa 100644 (file)
@@ -8,15 +8,20 @@ class Transaction extends Entry {
 
     private long seqnum;
     private long machineid;
-    private Set<KeyValue> keyValueUpdateSet;
+    private Set<KeyValue> keyValueUpdateSet = null;
     private Guard guard;
 
     public Transaction(Slot slot, long _seqnum, long _machineid, Set<KeyValue> _keyValueUpdateSet, Guard _guard) {
         super(slot);
         seqnum = _seqnum;
         machineid = _machineid;
-        keyValueUpdateSet = _keyValueUpdateSet;
-        guard = _guard;
+
+        for (KeyValue kv : _keyValueUpdateSet) {
+            KeyValue kvCopy = kv.getCopy();
+            keyValueUpdateSet.add(kvCopy);
+        }
+
+        guard = _guard.getCopy();
     }
 
     public long getMachineID() {
@@ -27,8 +32,13 @@ class Transaction extends Entry {
         return seqnum;
     }
 
+    public Set<KeyValue> getkeyValueUpdateSet() {
+        return keyValueUpdateSet;
+    }
+
+
     public byte getType() {
-        return Entry.TypeLastMessage;
+        return Entry.TypeTransaction;
     }
 
     public int getSize() {
@@ -52,6 +62,7 @@ class Transaction extends Entry {
         bb.putLong(seqnum);
         bb.putLong(machineid);
 
+        bb.putInt(keyValueUpdateSet.size());
         for (KeyValue kv : keyValueUpdateSet) {
             kv.encode(bb);
         }
@@ -65,7 +76,6 @@ class Transaction extends Entry {
         int numberOfKeys = bb.getInt();
 
         Set<KeyValue> kvSet = new HashSet<KeyValue>();
-
         for (int i = 0; i < numberOfKeys; i++) {
             KeyValue kv = KeyValue.decode(bb);
             kvSet.add(kv);
@@ -76,8 +86,6 @@ class Transaction extends Entry {
         return new Transaction(slot, seqnum, machineid, kvSet, guard);
     }
 
-
-
     public Entry getCopy(Slot s) {
         return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard);
     }