Initial Working version of IoTCloudv2, needs more testing
authorAli Younis <ayounis@uci.edu>
Fri, 2 Dec 2016 01:37:07 +0000 (17:37 -0800)
committerAli Younis <ayounis@uci.edu>
Fri, 2 Dec 2016 01:37:07 +0000 (17:37 -0800)
src2/java/iotcloud/Abort.java
src2/java/iotcloud/CloudComm.java
src2/java/iotcloud/Entry.java
src2/java/iotcloud/Guard.java
src2/java/iotcloud/NewKey.java
src2/java/iotcloud/PendingTransaction.java
src2/java/iotcloud/Table.java
src2/java/iotcloud/Test.java
src2/java/iotcloud/Transaction.java

index c2b57b7..4d0c59b 100644 (file)
@@ -40,7 +40,7 @@ class Abort extends Entry {
        }
 
        int getSize() {
-               return 2*Long.BYTES+Byte.BYTES;
+               return (2 * Long.BYTES) + Byte.BYTES;
        }
 
        byte getType() {
index ac906b1..f38756d 100644 (file)
@@ -24,7 +24,7 @@ class CloudComm {
        static final int SALT_SIZE = 8;
        byte salt[];
        Table table;
-       
+
        /**
         * Empty Constructor needed for child class.
         */
@@ -37,8 +37,8 @@ class CloudComm {
         */
 
        CloudComm(Table _table, String _baseurl, String _password) {
-               this.table=_table;
-               this.baseurl=_baseurl;
+               this.table = _table;
+               this.baseurl = _baseurl;
                this.password = _password;
                this.random = new SecureRandom();
        }
@@ -64,13 +64,13 @@ class CloudComm {
 
        private void initCrypt() {
                try {
-                       SecretKeySpec key=initKey();
+                       SecretKeySpec key = initKey();
                        password = null; // drop password
                        mac = Mac.getInstance("HmacSHA256");
                        mac.init(key);
-                       encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+                       encryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
                        encryptCipher.init(Cipher.ENCRYPT_MODE, key);
-                       decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+                       decryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
                        decryptCipher.init(Cipher.DECRYPT_MODE, key);
                } catch (Exception e) {
                        e.printStackTrace();
@@ -83,29 +83,32 @@ class CloudComm {
         */
 
        private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
-               String reqstring=isput?"req=putslot":"req=getslot";
-               String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
+               String reqstring = isput ? "req=putslot" : "req=getslot";
+               String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber;
                if (maxentries != 0)
-                       urlstr += "&max="+maxentries;
+                       urlstr += "&max=" + maxentries;
                return new URL(urlstr);
        }
-       
+
        public void setSalt() {
                try {
                        salt = new byte[SALT_SIZE];
                        random.nextBytes(salt);
-                       URL url=new URL(baseurl+"?req=setsalt");
-                       URLConnection con=url.openConnection();
+                       URL url = new URL(baseurl + "?req=setsalt");
+                       URLConnection con = url.openConnection();
                        HttpURLConnection http = (HttpURLConnection) con;
                        http.setRequestMethod("POST");
                        http.setFixedLengthStreamingMode(salt.length);
                        http.setDoOutput(true);
                        http.connect();
-                       OutputStream os=http.getOutputStream();
+                       OutputStream os = http.getOutputStream();
                        os.write(salt);
-                       int responsecode=http.getResponseCode();
-                       if (responsecode != HttpURLConnection.HTTP_OK)
+                       int responsecode = http.getResponseCode();
+                       if (responsecode != HttpURLConnection.HTTP_OK) {
+                               // TODO: Remove this print
+                               System.out.println(responsecode);
                                throw new Error("Invalid response");
+                       }
                } catch (Exception e) {
                        e.printStackTrace();
                        throw new Error("Failed setting salt");
@@ -114,20 +117,20 @@ class CloudComm {
        }
 
        private void getSalt() throws Exception {
-               URL url=new URL(baseurl+"?req=getsalt");
-               URLConnection con=url.openConnection();
+               URL url = new URL(baseurl + "?req=getsalt");
+               URLConnection con = url.openConnection();
                HttpURLConnection http = (HttpURLConnection) con;
                http.setRequestMethod("POST");
                http.connect();
-               
-               InputStream is=http.getInputStream();
-               DataInputStream dis=new DataInputStream(is);
-               int salt_length=dis.readInt();
-               byte [] tmp=new byte[salt_length];
+
+               InputStream is = http.getInputStream();
+               DataInputStream dis = new DataInputStream(is);
+               int salt_length = dis.readInt();
+               byte [] tmp = new byte[salt_length];
                dis.readFully(tmp);
-               salt=tmp;
+               salt = tmp;
        }
-       
+
        /*
         * API for putting a slot into the queue.  Returns null on success.
         * On failure, the server will send slots with newer sequence
@@ -140,13 +143,13 @@ class CloudComm {
                                getSalt();
                                initCrypt();
                        }
-                       
-                       long sequencenumber=slot.getSequenceNumber();
-                       byte[] bytes=slot.encode(mac);
+
+                       long sequencenumber = slot.getSequenceNumber();
+                       byte[] bytes = slot.encode(mac);
                        bytes = encryptCipher.doFinal(bytes);
 
-                       URL url=buildRequest(true, sequencenumber, max);
-                       URLConnection con=url.openConnection();
+                       URL url = buildRequest(true, sequencenumber, max);
+                       URLConnection con = url.openConnection();
                        HttpURLConnection http = (HttpURLConnection) con;
 
                        http.setRequestMethod("POST");
@@ -154,12 +157,12 @@ class CloudComm {
                        http.setDoOutput(true);
                        http.connect();
 
-                       OutputStream os=http.getOutputStream();
+                       OutputStream os = http.getOutputStream();
                        os.write(bytes);
 
-                       InputStream is=http.getInputStream();
-                       DataInputStream dis=new DataInputStream(is);
-                       byte[] resptype=new byte[7];
+                       InputStream is = http.getInputStream();
+                       DataInputStream dis = new DataInputStream(is);
+                       byte[] resptype = new byte[7];
                        dis.readFully(resptype);
                        if (Arrays.equals(resptype, "getslot".getBytes()))
                                return processSlots(dis);
@@ -184,20 +187,20 @@ class CloudComm {
                                getSalt();
                                initCrypt();
                        }
-                       
-                       URL url=buildRequest(false, sequencenumber, 0);
-                       URLConnection con=url.openConnection();
+
+                       URL url = buildRequest(false, sequencenumber, 0);
+                       URLConnection con = url.openConnection();
                        HttpURLConnection http = (HttpURLConnection) con;
                        http.setRequestMethod("POST");
                        http.connect();
-                       InputStream is=http.getInputStream();
+                       InputStream is = http.getInputStream();
+
+                       DataInputStream dis = new DataInputStream(is);
 
-                       DataInputStream dis=new DataInputStream(is);
-                       
-                       byte[] resptype=new byte[7];
+                       byte[] resptype = new byte[7];
                        dis.readFully(resptype);
                        if (!Arrays.equals(resptype, "getslot".getBytes()))
-                               throw new Error("Bad Response: "+new String(resptype));
+                               throw new Error("Bad Response: " + new String(resptype));
                        else
                                return processSlots(dis);
                } catch (Exception e) {
@@ -212,19 +215,19 @@ class CloudComm {
         */
 
        private Slot[] processSlots(DataInputStream dis) throws Exception {
-               int numberofslots=dis.readInt();
-               int[] sizesofslots=new int[numberofslots];
-               Slot[] slots=new Slot[numberofslots];
-               for(int i=0; i<numberofslots; i++)
-                       sizesofslots[i]=dis.readInt();
-
-               for(int i=0; i<numberofslots; i++) {
-                       byte[] data=new byte[sizesofslots[i]];
+               int numberofslots = dis.readInt();
+               int[] sizesofslots = new int[numberofslots];
+               Slot[] slots = new Slot[numberofslots];
+               for (int i = 0; i < numberofslots; i++)
+                       sizesofslots[i] = dis.readInt();
+
+               for (int i = 0; i < numberofslots; i++) {
+                       byte[] data = new byte[sizesofslots[i]];
                        dis.readFully(data);
 
                        data = decryptCipher.doFinal(data);
 
-                       slots[i]=Slot.decode(table, data, mac);
+                       slots[i] = Slot.decode(table, data, mac);
                }
                dis.close();
                return slots;
index 8734bb1..c5a6807 100644 (file)
@@ -40,6 +40,18 @@ abstract class Entry implements Liveness {
                byte type = bb.get();
                switch (type) {
 
+               case TypeCommit:
+                       return Commit.decode(slot, bb);
+
+               case TypeAbort:
+                       return Abort.decode(slot, bb);
+
+               case TypeTransaction:
+                       return Transaction.decode(slot, bb);
+
+               case TypeNewKey:
+                       return NewKey.decode(slot, bb);
+
                case TypeLastMessage:
                        return LastMessage.decode(slot, bb);
 
index 2f755cf..c08ed28 100644 (file)
@@ -84,22 +84,40 @@ class Guard {
      *
      */
     public int getSize() {
+
+        if (booleanExpression == null) {
+            return Integer.BYTES;
+        }
+
         return Integer.BYTES + booleanExpression.length();
     }
 
     public void encode(ByteBuffer bb) {
-        bb.putInt(booleanExpression.length());
-        bb.put(booleanExpression.internalBytes());
+        if (booleanExpression == null) {
+            bb.putInt(0);
+        } else {
+            bb.putInt(booleanExpression.length());
+            bb.put(booleanExpression.internalBytes());
+        }
     }
 
     static Guard decode(ByteBuffer bb) {
         int exprLength = bb.getInt();
-        byte[] expr = new byte[exprLength];
-        bb.get(expr);
-        return new Guard(IoTString.shallow(expr));
+
+        if (exprLength != 0) {
+            byte[] expr = new byte[exprLength];
+            bb.get(expr);
+            return new Guard(IoTString.shallow(expr));
+        }
+        return new Guard(null);
     }
 
     public Guard getCopy() {
+
+        if (booleanExpression == null) {
+            return new Guard(null);
+        }
+
         return new Guard(IoTString.shallow(booleanExpression.internalBytes()));
     }
 
index c101c0b..0970016 100644 (file)
@@ -37,14 +37,14 @@ class NewKey extends Entry {
        }
 
        public void encode(ByteBuffer bb) {
-               bb.put(Entry.TypeAbort);
+               bb.put(Entry.TypeNewKey);
                bb.putInt(key.length());
                bb.put(key.internalBytes());
                bb.putLong(machineid);
        }
 
        public int getSize() {
-               return Long.BYTES + Byte.BYTES + key.length();
+               return Long.BYTES + Byte.BYTES + Integer.BYTES + key.length();
        }
 
        public byte getType() {
index 3d39e12..c3c41c5 100644 (file)
@@ -27,16 +27,23 @@ class PendingTransaction {
      */
     public void addKV(KeyValue newKV) {
 
+        KeyValue rmKV = null;
+
         // Make sure there are no duplicates
         for (KeyValue kv : keyValueUpdateSet) {
             if (kv.getKey().equals(newKV.getKey())) {
 
                 // Remove key if we are adding a newer version of the same key
-                keyValueUpdateSet.remove(kv);
+                rmKV = kv;
                 break;
             }
         }
 
+        // Remove key if we are adding a newer version of the same key
+        if (rmKV != null) {
+            keyValueUpdateSet.remove(rmKV);
+        }
+
         // Add the key to the hash set
         keyValueUpdateSet.add(newKV);
     }
index 00d48b0..cf21f48 100644 (file)
@@ -689,45 +689,48 @@ final public class Table {
                }
 
 
-               // Arbitrate
-               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-               for (Transaction ut : uncommittedTransactionsList) {
-
-                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-                       // Check if this machine arbitrates for this transaction
-                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-                               continue;
-                       }
-
-                       Entry newEntry = null;
-
-                       try {
-                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-                                       // Guard evaluated as true
-
-                                       // update the local tmp current key set
-                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-                                               speculativeTableTmp.put(kv.getKey(), kv);
-                                       }
-
-                                       // create the commit
-                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-                               } else {
-                                       // Guard was false
-
-                                       // create the abort
-                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-                               }
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
-
-                       if ((newEntry != null) && s.hasSpace(newEntry)) {
-                               s.addEntry(newEntry);
-                       } else {
-                               break;
-                       }
-               }
+               // // Arbitrate
+               // Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               // for (Transaction ut : uncommittedTransactionsList) {
+
+               //      KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+               //      // Check if this machine arbitrates for this transaction
+               //      if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+               //              continue;
+               //      }
+
+               //      Entry newEntry = null;
+
+               //      try {
+               //              if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+               //                      // Guard evaluated as true
+
+               //                      // update the local tmp current key set
+               //                      for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+               //                              speculativeTableTmp.put(kv.getKey(), kv);
+               //                      }
+
+               //                      // create the commit
+               //                      newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+               //              } else {
+               //                      // Guard was false
+
+               //                      // create the abort
+               //                      newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+               //              }
+               //      } catch (Exception e) {
+               //              e.printStackTrace();
+               //      }
+
+               //      if ((newEntry != null) && s.hasSpace(newEntry)) {
+
+               //              // TODO: Remove print
+               //              System.out.println("Arbitrating...");
+               //              s.addEntry(newEntry);
+               //      } else {
+               //              break;
+               //      }
+               // }
 
 
                NewKey newKey = new NewKey(s, keyName, arbMachineid);
@@ -960,7 +963,8 @@ final public class Table {
                        prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
 
                        if (!prevcommit.isLive()) {
-                               commitList.remove(prevcommit);
+                               //commitList.remove(prevcommit);
+                               i.remove();
                        }
                }
 
@@ -979,7 +983,8 @@ final public class Table {
                        Transaction prevtrans = i.next();
 
                        if (prevtrans.getSequenceNumber() <= committedTransSeq) {
-                               uncommittedTransactionsList.remove(prevtrans);
+                               // uncommittedTransactionsList.remove(prevtrans);
+                               i.remove();
                                prevtrans.setDead();
                        }
                }
index 6ac5b06..00d4c7c 100644 (file)
@@ -19,37 +19,57 @@ public class Test {
                Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
 
-               for (int i = 0; i < 600; i++) {
-                       String a = "STR" + i;
-                       String b = "ABR" + i;
+
+
+               final int NUMBER_OF_TESTS = 200;
+
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+                       System.out.println("Doing: " + i);
+
+                       String a = "a" + i;
+                       String b = "b" + i;
                        IoTString ia = new IoTString(a);
                        IoTString ib = new IoTString(b);
 
 
-                       t1.createNewKey(ia, 321);
-                       t2.createNewKey(ib, 351);
+                       t1.createNewKey(ia, 351);
+                       t2.createNewKey(ib, 321);
 
                        t1.startTransaction();
                        t1.addKV(ia, ia);
                        t1.commitTransaction();
+               }
+
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+                       System.out.println("Doing: " + i);
+
+                       String a = "a" + i;
+                       String b = "b" + i;
+                       IoTString ia = new IoTString(a);
+                       IoTString ib = new IoTString(b);
 
                        t2.startTransaction();
                        t2.addKV(ib, ib);
                        t2.commitTransaction();
                }
 
-               t1.update();
-               t2.update();
 
 
-               for (int i = 0; i < 600; i++) {
-                       String a = "STR" + i;
-                       String b = "ABR" + i;
+               t1.update();
+               // t2.update();
+               // t1.update();
+
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                       String a = "a" + i;
+                       String b = "b" + i;
                        IoTString ia = new IoTString(a);
                        IoTString ib = new IoTString(b);
 
-                       System.out.println(ib + "->" + t1.getCommitted(ib));
-                       System.out.println(ia + "->" + t2.getCommitted(ia));
+                       System.out.println(ib + " -> " + t1.getCommitted(ib));
+                       System.out.println(ia + " -> " + t2.getCommitted(ia));
+                       System.out.println();
                }
        }
 }
index 18b8721..5acd5c2 100644 (file)
@@ -16,6 +16,8 @@ class Transaction extends Entry {
         seqnum = _seqnum;
         machineid = _machineid;
 
+        keyValueUpdateSet = new HashSet<KeyValue>();
+
         for (KeyValue kv : _keyValueUpdateSet) {
             KeyValue kvCopy = kv.getCopy();
             keyValueUpdateSet.add(kvCopy);