Added mutex for thread safety
authorAli Younis <ayounis@uci.edu>
Wed, 4 Jan 2017 21:39:36 +0000 (13:39 -0800)
committerAli Younis <ayounis@uci.edu>
Wed, 4 Jan 2017 21:39:36 +0000 (13:39 -0800)
version2/src/java/iotcloud/LocalComm.java
version2/src/java/iotcloud/Table.java
version2/src/java/iotcloud/Test.java

index d00c473..17e3c05 100644 (file)
@@ -9,7 +9,7 @@ class LocalComm {
         t2 = _t2;
     }
 
-    public byte[] sendDataToLocalDevice(Long deviceId, byte[] data) {
+    public byte[] sendDataToLocalDevice(Long deviceId, byte[] data) throws InterruptedException{
         System.out.println("Passing Locally");
 
         if (deviceId == t1.getId()) {
index 1d097c7..f6d699b 100644 (file)
@@ -14,6 +14,7 @@ import java.util.Set;
 import java.util.Collection;
 import java.util.Collections;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
 
 
 /**
@@ -74,6 +75,9 @@ final public class Table {
        private Map<Long, TransactionStatus> transactionStatusMap = null;
        private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
 
+       private Semaphore mutex = null;
+
+
 
        public Table(String hostname, String baseurl, String password, long _localmachineid) {
                localmachineid = _localmachineid;
@@ -118,6 +122,7 @@ final public class Table {
                localCommunicationChannels = new HashMap<Long, LocalComm>();
                transactionStatusMap = new HashMap<Long, TransactionStatus>();
                transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
+               mutex = new Semaphore(1);
        }
 
        public void initTable() throws ServerException {
@@ -135,9 +140,11 @@ final public class Table {
                }
        }
 
-       public void rebuild() throws ServerException {
+       public void rebuild() throws ServerException, InterruptedException {
+               mutex.acquire();
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
                validateandupdate(newslots, true);
+               mutex.release();
        }
 
        // TODO: delete method
@@ -243,12 +250,22 @@ final public class Table {
        public void addLocalComm(long machineId, LocalComm lc) {
                localCommunicationChannels.put(machineId, lc);
        }
-       public Long getArbitrator(IoTString key) {
-               return arbitratorTable.get(key);
+       public Long getArbitrator(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+               Long arb = arbitratorTable.get(key);
+               mutex.release();
+
+               return arb;
        }
 
-       public IoTString getCommitted(IoTString key) {
+       public IoTString getCommitted(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
                KeyValue kv = commitedTable.get(key);
+               mutex.release();
+
+
                if (kv != null) {
                        return kv.getValue();
                } else {
@@ -256,7 +273,10 @@ final public class Table {
                }
        }
 
-       public IoTString getSpeculative(IoTString key) {
+       public IoTString getSpeculative(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
                KeyValue kv = pendingTransSpeculativeTable.get(key);
 
                if (kv == null) {
@@ -266,6 +286,8 @@ final public class Table {
                if (kv == null) {
                        kv = commitedTable.get(key);
                }
+               mutex.release();
+
 
                if (kv != null) {
                        return kv.getValue();
@@ -274,7 +296,10 @@ final public class Table {
                }
        }
 
-       public IoTString getCommittedAtomic(IoTString key) {
+       public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
                KeyValue kv = commitedTable.get(key);
 
                if (arbitratorTable.get(key) == null) {
@@ -287,6 +312,8 @@ final public class Table {
                        throw new Error("Not all Key Values Match Arbitrator.");
                }
 
+               mutex.release();
+
                if (kv != null) {
                        pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
                        return kv.getValue();
@@ -296,7 +323,9 @@ final public class Table {
                }
        }
 
-       public IoTString getSpeculativeAtomic(IoTString key) {
+       public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
 
                if (arbitratorTable.get(key) == null) {
                        throw new Error("Key not Found.");
@@ -318,6 +347,8 @@ final public class Table {
                        kv = commitedTable.get(key);
                }
 
+               mutex.release();
+
                if (kv != null) {
                        pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
                        return kv.getValue();
@@ -327,7 +358,9 @@ final public class Table {
                }
        }
 
-       public Pair<Boolean, Boolean> update() {
+       public Pair<Boolean, Boolean> update() throws InterruptedException {
+
+               mutex.acquire();
 
                boolean gotLatestFromServer = false;
                boolean didSendLocal = false;
@@ -349,14 +382,17 @@ final public class Table {
 
                        didSendLocal = true;
 
+
                } catch (Exception e) {
                        // could not update so do nothing
                }
 
+
+               mutex.release();
                return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
        }
 
-       public Boolean updateFromLocal(long arb) {
+       public Boolean updateFromLocal(long arb) throws InterruptedException {
                LocalComm lc = localCommunicationChannels.get(arb);
                if (lc == null) {
                        // Cant talk directly to arbitrator so cant do anything
@@ -372,6 +408,7 @@ final public class Table {
                        bbEncode.putLong(0);
                }
 
+               mutex.acquire();
                byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
 
                // Decode the data
@@ -386,7 +423,6 @@ final public class Table {
                        newCommits.add(com);
                }
 
-
                for (Commit commit : newCommits) {
                        // Prepare to process the commit
                        processEntry(commit);
@@ -401,10 +437,13 @@ final public class Table {
                didCommitOrSpeculate |= createSpeculativeTable();
                createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
 
+
+               mutex.release();
+
                return true;
        }
 
-       public void startTransaction() {
+       public void startTransaction() throws InterruptedException {
                // Create a new transaction, invalidates any old pending transactions.
                pendingTransBuild = new PendingTransaction();
        }
@@ -425,7 +464,7 @@ final public class Table {
                pendingTransBuild.addKV(kv);
        }
 
-       public TransactionStatus commitTransaction() {
+       public TransactionStatus commitTransaction() throws InterruptedException {
 
                if (pendingTransBuild.getKVUpdates().size() == 0) {
 
@@ -433,6 +472,8 @@ final public class Table {
                        return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
                }
 
+               mutex.acquire();
+
                TransactionStatus transStatus = null;
 
                if (pendingTransBuild.getArbitrator() != localmachineid) {
@@ -494,25 +535,35 @@ final public class Table {
                // reset it so next time is fresh
                pendingTransBuild = new PendingTransaction();
 
+
+               mutex.release();
                return transStatus;
        }
 
-       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
+       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
+               try {
+                       mutex.acquire();
 
-               while (true) {
-                       if (arbitratorTable.get(keyName) != null) {
-                               // There is already an arbitrator
-                               return false;
-                       }
+                       while (true) {
+                               if (arbitratorTable.get(keyName) != null) {
+                                       // There is already an arbitrator
+                                       mutex.release();
+                                       return false;
+                               }
 
-                       if (tryput(keyName, machineId, false)) {
-                               // If successfully inserted
-                               return true;
+                               if (tryput(keyName, machineId, false)) {
+                                       // If successfully inserted
+                                       mutex.release();
+                                       return true;
+                               }
                        }
+               } catch (ServerException e) {
+                       mutex.release();
+                       throw e;
                }
        }
 
-       private void processPendingTrans() {
+       private void processPendingTrans() throws InterruptedException {
 
                boolean sentAllPending = false;
                try {
@@ -581,8 +632,7 @@ final public class Table {
                }
        }
 
-       private void updateWithNotPendingTrans() throws ServerException {
-
+       private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
                boolean doEnd = false;
                boolean needResize = false;
                while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
@@ -645,7 +695,7 @@ final public class Table {
                }
        }
 
-       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
 
                // encode the request
                byte[] array = new byte[Long.BYTES + ut.getSize()];
@@ -675,7 +725,9 @@ final public class Table {
                return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
        }
 
-       public byte[] localCommInput(byte[] data) {
+       public byte[] localCommInput(byte[] data) throws InterruptedException {
+
+
 
                // Decode the data
                ByteBuffer bbDecode = ByteBuffer.wrap(data);
@@ -686,8 +738,12 @@ final public class Table {
                        bbDecode.get();
                        ut = (Transaction)Transaction.decode(null, bbDecode);
                }
+
+               mutex.acquire();
                // Do the local update and arbitrate
                Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+               mutex.release();
+
 
                // Calculate the size of the response
                int size = Byte.BYTES + Integer.BYTES;
index 6b0c4ae..0de2e5c 100644 (file)
@@ -13,7 +13,7 @@ public class Test {
 
        public static final  int NUMBER_OF_TESTS = 100;
 
-       public static void main(String[] args)  throws ServerException {
+       public static void main(String[] args)  throws ServerException, InterruptedException {
                if (args[0].equals("2")) {
                        test2();
                } else if (args[0].equals("3")) {
@@ -38,7 +38,7 @@ public class Test {
        }
 
 
-       static void test11()  {
+       static void test11() throws InterruptedException {
 
                boolean foundError = false;
 
@@ -146,7 +146,7 @@ public class Test {
                }
        }
 
-       static void test10()  {
+       static void test10() throws InterruptedException {
 
                boolean foundError = false;
 
@@ -252,7 +252,7 @@ public class Test {
                }
        }
 
-       static void test9()  {
+       static void test9() throws InterruptedException {
 
                boolean foundError = false;
                List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
@@ -509,7 +509,7 @@ public class Test {
                }
        }
 
-       static void test8() {
+       static void test8() throws InterruptedException {
 
                boolean foundError = false;
                List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
@@ -754,7 +754,7 @@ public class Test {
                }
        }
 
-       static void test7() throws ServerException {
+       static void test7() throws ServerException, InterruptedException {
 
                boolean foundError = false;
                List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
@@ -970,7 +970,7 @@ public class Test {
                }
        }
 
-       static void test6() throws ServerException {
+       static void test6() throws ServerException, InterruptedException {
 
                boolean foundError = false;
                List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
@@ -1131,7 +1131,7 @@ public class Test {
                }
        }
 
-       static void test5() throws ServerException {
+       static void test5() throws ServerException, InterruptedException {
 
                boolean foundError = false;
                List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
@@ -1346,7 +1346,7 @@ public class Test {
                }
        }
 
-       static void test4() throws ServerException {
+       static void test4() throws ServerException, InterruptedException {
 
                boolean foundError = false;
                long startTime = 0;
@@ -1520,7 +1520,7 @@ public class Test {
                }
        }
 
-       static void test3() throws ServerException {
+       static void test3() throws ServerException, InterruptedException {
 
                long startTime = 0;
                long endTime = 0;
@@ -1702,7 +1702,7 @@ public class Test {
                }
        }
 
-       static void test2() throws ServerException {
+       static void test2() throws ServerException, InterruptedException {
 
                boolean foundError = false;
                long startTime = 0;
@@ -1712,9 +1712,11 @@ public class Test {
                // Setup the 2 clients
                Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
                t1.initTable();
+               System.out.println("T1 Ready");
+
                Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
                t2.update();
-
+               System.out.println("T2 Ready");
 
                // Make the Keys
                System.out.println("Setting up keys");