Added mutex for thread safety
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index 1d097c77d07204f9783cf77a9748d3cd843f1fdd..f6d699b91187a6682b096f19074aefc6c69d8531 100644 (file)
@@ -14,6 +14,7 @@ import java.util.Set;
 import java.util.Collection;
 import java.util.Collections;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
 
 
 /**
@@ -74,6 +75,9 @@ final public class Table {
        private Map<Long, TransactionStatus> transactionStatusMap = null;
        private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
 
+       private Semaphore mutex = null;
+
+
 
        public Table(String hostname, String baseurl, String password, long _localmachineid) {
                localmachineid = _localmachineid;
@@ -118,6 +122,7 @@ final public class Table {
                localCommunicationChannels = new HashMap<Long, LocalComm>();
                transactionStatusMap = new HashMap<Long, TransactionStatus>();
                transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
+               mutex = new Semaphore(1);
        }
 
        public void initTable() throws ServerException {
@@ -135,9 +140,11 @@ final public class Table {
                }
        }
 
-       public void rebuild() throws ServerException {
+       public void rebuild() throws ServerException, InterruptedException {
+               mutex.acquire();
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
                validateandupdate(newslots, true);
+               mutex.release();
        }
 
        // TODO: delete method
@@ -243,12 +250,22 @@ final public class Table {
        public void addLocalComm(long machineId, LocalComm lc) {
                localCommunicationChannels.put(machineId, lc);
        }
-       public Long getArbitrator(IoTString key) {
-               return arbitratorTable.get(key);
+       public Long getArbitrator(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+               Long arb = arbitratorTable.get(key);
+               mutex.release();
+
+               return arb;
        }
 
-       public IoTString getCommitted(IoTString key) {
+       public IoTString getCommitted(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
                KeyValue kv = commitedTable.get(key);
+               mutex.release();
+
+
                if (kv != null) {
                        return kv.getValue();
                } else {
@@ -256,7 +273,10 @@ final public class Table {
                }
        }
 
-       public IoTString getSpeculative(IoTString key) {
+       public IoTString getSpeculative(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
                KeyValue kv = pendingTransSpeculativeTable.get(key);
 
                if (kv == null) {
@@ -266,6 +286,8 @@ final public class Table {
                if (kv == null) {
                        kv = commitedTable.get(key);
                }
+               mutex.release();
+
 
                if (kv != null) {
                        return kv.getValue();
@@ -274,7 +296,10 @@ final public class Table {
                }
        }
 
-       public IoTString getCommittedAtomic(IoTString key) {
+       public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
+
                KeyValue kv = commitedTable.get(key);
 
                if (arbitratorTable.get(key) == null) {
@@ -287,6 +312,8 @@ final public class Table {
                        throw new Error("Not all Key Values Match Arbitrator.");
                }
 
+               mutex.release();
+
                if (kv != null) {
                        pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
                        return kv.getValue();
@@ -296,7 +323,9 @@ final public class Table {
                }
        }
 
-       public IoTString getSpeculativeAtomic(IoTString key) {
+       public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
+
+               mutex.acquire();
 
                if (arbitratorTable.get(key) == null) {
                        throw new Error("Key not Found.");
@@ -318,6 +347,8 @@ final public class Table {
                        kv = commitedTable.get(key);
                }
 
+               mutex.release();
+
                if (kv != null) {
                        pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
                        return kv.getValue();
@@ -327,7 +358,9 @@ final public class Table {
                }
        }
 
-       public Pair<Boolean, Boolean> update() {
+       public Pair<Boolean, Boolean> update() throws InterruptedException {
+
+               mutex.acquire();
 
                boolean gotLatestFromServer = false;
                boolean didSendLocal = false;
@@ -349,14 +382,17 @@ final public class Table {
 
                        didSendLocal = true;
 
+
                } catch (Exception e) {
                        // could not update so do nothing
                }
 
+
+               mutex.release();
                return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
        }
 
-       public Boolean updateFromLocal(long arb) {
+       public Boolean updateFromLocal(long arb) throws InterruptedException {
                LocalComm lc = localCommunicationChannels.get(arb);
                if (lc == null) {
                        // Cant talk directly to arbitrator so cant do anything
@@ -372,6 +408,7 @@ final public class Table {
                        bbEncode.putLong(0);
                }
 
+               mutex.acquire();
                byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
 
                // Decode the data
@@ -386,7 +423,6 @@ final public class Table {
                        newCommits.add(com);
                }
 
-
                for (Commit commit : newCommits) {
                        // Prepare to process the commit
                        processEntry(commit);
@@ -401,10 +437,13 @@ final public class Table {
                didCommitOrSpeculate |= createSpeculativeTable();
                createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
 
+
+               mutex.release();
+
                return true;
        }
 
-       public void startTransaction() {
+       public void startTransaction() throws InterruptedException {
                // Create a new transaction, invalidates any old pending transactions.
                pendingTransBuild = new PendingTransaction();
        }
@@ -425,7 +464,7 @@ final public class Table {
                pendingTransBuild.addKV(kv);
        }
 
-       public TransactionStatus commitTransaction() {
+       public TransactionStatus commitTransaction() throws InterruptedException {
 
                if (pendingTransBuild.getKVUpdates().size() == 0) {
 
@@ -433,6 +472,8 @@ final public class Table {
                        return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
                }
 
+               mutex.acquire();
+
                TransactionStatus transStatus = null;
 
                if (pendingTransBuild.getArbitrator() != localmachineid) {
@@ -494,25 +535,35 @@ final public class Table {
                // reset it so next time is fresh
                pendingTransBuild = new PendingTransaction();
 
+
+               mutex.release();
                return transStatus;
        }
 
-       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
+       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
+               try {
+                       mutex.acquire();
 
-               while (true) {
-                       if (arbitratorTable.get(keyName) != null) {
-                               // There is already an arbitrator
-                               return false;
-                       }
+                       while (true) {
+                               if (arbitratorTable.get(keyName) != null) {
+                                       // There is already an arbitrator
+                                       mutex.release();
+                                       return false;
+                               }
 
-                       if (tryput(keyName, machineId, false)) {
-                               // If successfully inserted
-                               return true;
+                               if (tryput(keyName, machineId, false)) {
+                                       // If successfully inserted
+                                       mutex.release();
+                                       return true;
+                               }
                        }
+               } catch (ServerException e) {
+                       mutex.release();
+                       throw e;
                }
        }
 
-       private void processPendingTrans() {
+       private void processPendingTrans() throws InterruptedException {
 
                boolean sentAllPending = false;
                try {
@@ -581,8 +632,7 @@ final public class Table {
                }
        }
 
-       private void updateWithNotPendingTrans() throws ServerException {
-
+       private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
                boolean doEnd = false;
                boolean needResize = false;
                while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
@@ -645,7 +695,7 @@ final public class Table {
                }
        }
 
-       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
 
                // encode the request
                byte[] array = new byte[Long.BYTES + ut.getSize()];
@@ -675,7 +725,9 @@ final public class Table {
                return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
        }
 
-       public byte[] localCommInput(byte[] data) {
+       public byte[] localCommInput(byte[] data) throws InterruptedException {
+
+
 
                // Decode the data
                ByteBuffer bbDecode = ByteBuffer.wrap(data);
@@ -686,8 +738,12 @@ final public class Table {
                        bbDecode.get();
                        ut = (Transaction)Transaction.decode(null, bbDecode);
                }
+
+               mutex.acquire();
                // Do the local update and arbitrate
                Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+               mutex.release();
+
 
                // Calculate the size of the response
                int size = Byte.BYTES + Integer.BYTES;