From a1fd28d4b49b14e9ee970cb6acaeeba3e069d541 Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Wed, 4 Jan 2017 13:39:36 -0800 Subject: [PATCH] Added mutex for thread safety --- version2/src/java/iotcloud/LocalComm.java | 2 +- version2/src/java/iotcloud/Table.java | 108 ++++++++++++++++------ version2/src/java/iotcloud/Test.java | 26 +++--- 3 files changed, 97 insertions(+), 39 deletions(-) diff --git a/version2/src/java/iotcloud/LocalComm.java b/version2/src/java/iotcloud/LocalComm.java index d00c473..17e3c05 100644 --- a/version2/src/java/iotcloud/LocalComm.java +++ b/version2/src/java/iotcloud/LocalComm.java @@ -9,7 +9,7 @@ class LocalComm { t2 = _t2; } - public byte[] sendDataToLocalDevice(Long deviceId, byte[] data) { + public byte[] sendDataToLocalDevice(Long deviceId, byte[] data) throws InterruptedException{ System.out.println("Passing Locally"); if (deviceId == t1.getId()) { diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 1d097c7..f6d699b 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -14,6 +14,7 @@ import java.util.Set; import java.util.Collection; import java.util.Collections; import java.nio.ByteBuffer; +import java.util.concurrent.Semaphore; /** @@ -74,6 +75,9 @@ final public class Table { private Map transactionStatusMap = null; private Map transactionStatusNotSentMap = null; + private Semaphore mutex = null; + + public Table(String hostname, String baseurl, String password, long _localmachineid) { localmachineid = _localmachineid; @@ -118,6 +122,7 @@ final public class Table { localCommunicationChannels = new HashMap(); transactionStatusMap = new HashMap(); transactionStatusNotSentMap = new HashMap(); + mutex = new Semaphore(1); } public void initTable() throws ServerException { @@ -135,9 +140,11 @@ final public class Table { } } - public void rebuild() throws ServerException { + public void rebuild() throws ServerException, InterruptedException { + mutex.acquire(); Slot[] newslots = cloud.getSlots(sequencenumber + 1); validateandupdate(newslots, true); + mutex.release(); } // TODO: delete method @@ -243,12 +250,22 @@ final public class Table { public void addLocalComm(long machineId, LocalComm lc) { localCommunicationChannels.put(machineId, lc); } - public Long getArbitrator(IoTString key) { - return arbitratorTable.get(key); + public Long getArbitrator(IoTString key) throws InterruptedException { + + mutex.acquire(); + Long arb = arbitratorTable.get(key); + mutex.release(); + + return arb; } - public IoTString getCommitted(IoTString key) { + public IoTString getCommitted(IoTString key) throws InterruptedException { + + mutex.acquire(); KeyValue kv = commitedTable.get(key); + mutex.release(); + + if (kv != null) { return kv.getValue(); } else { @@ -256,7 +273,10 @@ final public class Table { } } - public IoTString getSpeculative(IoTString key) { + public IoTString getSpeculative(IoTString key) throws InterruptedException { + + mutex.acquire(); + KeyValue kv = pendingTransSpeculativeTable.get(key); if (kv == null) { @@ -266,6 +286,8 @@ final public class Table { if (kv == null) { kv = commitedTable.get(key); } + mutex.release(); + if (kv != null) { return kv.getValue(); @@ -274,7 +296,10 @@ final public class Table { } } - public IoTString getCommittedAtomic(IoTString key) { + public IoTString getCommittedAtomic(IoTString key) throws InterruptedException { + + mutex.acquire(); + KeyValue kv = commitedTable.get(key); if (arbitratorTable.get(key) == null) { @@ -287,6 +312,8 @@ final public class Table { throw new Error("Not all Key Values Match Arbitrator."); } + mutex.release(); + if (kv != null) { pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue())); return kv.getValue(); @@ -296,7 +323,9 @@ final public class Table { } } - public IoTString getSpeculativeAtomic(IoTString key) { + public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException { + + mutex.acquire(); if (arbitratorTable.get(key) == null) { throw new Error("Key not Found."); @@ -318,6 +347,8 @@ final public class Table { kv = commitedTable.get(key); } + mutex.release(); + if (kv != null) { pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue())); return kv.getValue(); @@ -327,7 +358,9 @@ final public class Table { } } - public Pair update() { + public Pair update() throws InterruptedException { + + mutex.acquire(); boolean gotLatestFromServer = false; boolean didSendLocal = false; @@ -349,14 +382,17 @@ final public class Table { didSendLocal = true; + } catch (Exception e) { // could not update so do nothing } + + mutex.release(); return new Pair(gotLatestFromServer, didSendLocal); } - public Boolean updateFromLocal(long arb) { + public Boolean updateFromLocal(long arb) throws InterruptedException { LocalComm lc = localCommunicationChannels.get(arb); if (lc == null) { // Cant talk directly to arbitrator so cant do anything @@ -372,6 +408,7 @@ final public class Table { bbEncode.putLong(0); } + mutex.acquire(); byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array()); // Decode the data @@ -386,7 +423,6 @@ final public class Table { newCommits.add(com); } - for (Commit commit : newCommits) { // Prepare to process the commit processEntry(commit); @@ -401,10 +437,13 @@ final public class Table { didCommitOrSpeculate |= createSpeculativeTable(); createPendingTransactionSpeculativeTable(didCommitOrSpeculate); + + mutex.release(); + return true; } - public void startTransaction() { + public void startTransaction() throws InterruptedException { // Create a new transaction, invalidates any old pending transactions. pendingTransBuild = new PendingTransaction(); } @@ -425,7 +464,7 @@ final public class Table { pendingTransBuild.addKV(kv); } - public TransactionStatus commitTransaction() { + public TransactionStatus commitTransaction() throws InterruptedException { if (pendingTransBuild.getKVUpdates().size() == 0) { @@ -433,6 +472,8 @@ final public class Table { return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); } + mutex.acquire(); + TransactionStatus transStatus = null; if (pendingTransBuild.getArbitrator() != localmachineid) { @@ -494,25 +535,35 @@ final public class Table { // reset it so next time is fresh pendingTransBuild = new PendingTransaction(); + + mutex.release(); return transStatus; } - public boolean createNewKey(IoTString keyName, long machineId) throws ServerException { + public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException { + try { + mutex.acquire(); - while (true) { - if (arbitratorTable.get(keyName) != null) { - // There is already an arbitrator - return false; - } + while (true) { + if (arbitratorTable.get(keyName) != null) { + // There is already an arbitrator + mutex.release(); + return false; + } - if (tryput(keyName, machineId, false)) { - // If successfully inserted - return true; + if (tryput(keyName, machineId, false)) { + // If successfully inserted + mutex.release(); + return true; + } } + } catch (ServerException e) { + mutex.release(); + throw e; } } - private void processPendingTrans() { + private void processPendingTrans() throws InterruptedException { boolean sentAllPending = false; try { @@ -581,8 +632,7 @@ final public class Table { } } - private void updateWithNotPendingTrans() throws ServerException { - + private void updateWithNotPendingTrans() throws ServerException, InterruptedException { boolean doEnd = false; boolean needResize = false; while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) { @@ -645,7 +695,7 @@ final public class Table { } } - private Pair> sendTransactionToLocal(Transaction ut, LocalComm lc) { + private Pair> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException { // encode the request byte[] array = new byte[Long.BYTES + ut.getSize()]; @@ -675,7 +725,9 @@ final public class Table { return new Pair>(didCommit, newCommits); } - public byte[] localCommInput(byte[] data) { + public byte[] localCommInput(byte[] data) throws InterruptedException { + + // Decode the data ByteBuffer bbDecode = ByteBuffer.wrap(data); @@ -686,8 +738,12 @@ final public class Table { bbDecode.get(); ut = (Transaction)Transaction.decode(null, bbDecode); } + + mutex.acquire(); // Do the local update and arbitrate Pair> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit); + mutex.release(); + // Calculate the size of the response int size = Byte.BYTES + Integer.BYTES; diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index 6b0c4ae..0de2e5c 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -13,7 +13,7 @@ public class Test { public static final int NUMBER_OF_TESTS = 100; - public static void main(String[] args) throws ServerException { + public static void main(String[] args) throws ServerException, InterruptedException { if (args[0].equals("2")) { test2(); } else if (args[0].equals("3")) { @@ -38,7 +38,7 @@ public class Test { } - static void test11() { + static void test11() throws InterruptedException { boolean foundError = false; @@ -146,7 +146,7 @@ public class Test { } } - static void test10() { + static void test10() throws InterruptedException { boolean foundError = false; @@ -252,7 +252,7 @@ public class Test { } } - static void test9() { + static void test9() throws InterruptedException { boolean foundError = false; List transStatusList = new ArrayList(); @@ -509,7 +509,7 @@ public class Test { } } - static void test8() { + static void test8() throws InterruptedException { boolean foundError = false; List transStatusList = new ArrayList(); @@ -754,7 +754,7 @@ public class Test { } } - static void test7() throws ServerException { + static void test7() throws ServerException, InterruptedException { boolean foundError = false; List transStatusList = new ArrayList(); @@ -970,7 +970,7 @@ public class Test { } } - static void test6() throws ServerException { + static void test6() throws ServerException, InterruptedException { boolean foundError = false; List transStatusList = new ArrayList(); @@ -1131,7 +1131,7 @@ public class Test { } } - static void test5() throws ServerException { + static void test5() throws ServerException, InterruptedException { boolean foundError = false; List transStatusList = new ArrayList(); @@ -1346,7 +1346,7 @@ public class Test { } } - static void test4() throws ServerException { + static void test4() throws ServerException, InterruptedException { boolean foundError = false; long startTime = 0; @@ -1520,7 +1520,7 @@ public class Test { } } - static void test3() throws ServerException { + static void test3() throws ServerException, InterruptedException { long startTime = 0; long endTime = 0; @@ -1702,7 +1702,7 @@ public class Test { } } - static void test2() throws ServerException { + static void test2() throws ServerException, InterruptedException { boolean foundError = false; long startTime = 0; @@ -1712,9 +1712,11 @@ public class Test { // Setup the 2 clients Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321); t1.initTable(); + System.out.println("T1 Ready"); + Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351); t2.update(); - + System.out.println("T2 Ready"); // Make the Keys System.out.println("Setting up keys"); -- 2.34.1