Local communication working
authorAli Younis <ayounis@uci.edu>
Sun, 15 Jan 2017 21:04:22 +0000 (13:04 -0800)
committerAli Younis <ayounis@uci.edu>
Sun, 15 Jan 2017 21:04:22 +0000 (13:04 -0800)
version2/src/java/iotcloud/CloudComm.java
version2/src/java/iotcloud/ServerException.java
version2/src/java/iotcloud/Table.java
version2/src/java/iotcloud/Test.java
version2/src/java/iotcloud/Transaction.java
version2/src/server/iotquery.cpp

index 5741019..b8c296b 100644 (file)
@@ -74,6 +74,19 @@ class CloudComm {
                }
        }
 
+       /**
+        * Inits all the security stuff
+        */
+       public void initSecurity() throws ServerException {
+               // try to get the salt and if one does not exist set one
+               if (!getSalt()) {
+                       //Set the salt
+                       setSalt();
+               }
+
+               initCrypt();
+       }
+
        /**
         * Inits the HMAC generator.
         */
@@ -109,30 +122,31 @@ class CloudComm {
                return new URL(urlstr);
        }
 
-       public void setSalt() throws ServerException {
+       private void setSalt() throws ServerException {
 
                if (salt != null) {
                        // Salt already sent to server so dont set it again
                        return;
                }
-               byte[] saltTmp = new byte[SALT_SIZE];
-               random.nextBytes(saltTmp);
-
-               URL url = null;
-               URLConnection con = null;
-               HttpURLConnection http = null;
 
                try {
-                       url = new URL(baseurl + "?req=setsalt");
-                       con = url.openConnection();
-                       http = (HttpURLConnection) con;
+                       byte[] saltTmp = new byte[SALT_SIZE];
+                       random.nextBytes(saltTmp);
+
+                       URL url = new URL(baseurl + "?req=setsalt");
+                       URLConnection con = url.openConnection();
+                       HttpURLConnection http = (HttpURLConnection) con;
+
                        http.setRequestMethod("POST");
                        http.setFixedLengthStreamingMode(saltTmp.length);
                        http.setDoOutput(true);
                        http.setConnectTimeout(TIMEOUT_MILLIS);
                        http.connect();
+
                        OutputStream os = http.getOutputStream();
                        os.write(saltTmp);
+                       os.flush();
+
                        int responsecode = http.getResponseCode();
                        if (responsecode != HttpURLConnection.HTTP_OK) {
                                // TODO: Remove this print
@@ -140,33 +154,14 @@ class CloudComm {
                                throw new Error("Invalid response");
                        }
 
+                       salt = saltTmp;
                } catch (Exception e) {
+                       // e.printStackTrace();
                        throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
                }
-
-
-               try {
-                       InputStream is = http.getInputStream();
-                       DataInputStream dis = new DataInputStream(is);
-                       // byte [] tmp = new byte[1];
-                       byte tmp = dis.readByte();
-
-                       if (tmp == 0) {
-                               salt = saltTmp;
-                               initCrypt();
-                       } else {
-                               getSalt(); // there was already a salt so we need to get it
-                       }
-
-               } catch (SocketTimeoutException e) {
-                       throw new ServerException("setSalt failed", ServerException.TypeInputTimeout);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       throw new Error("setSlot failed");
-               }
        }
 
-       private void getSalt() throws ServerException {
+       private boolean getSalt() throws ServerException {
                URL url = null;
                URLConnection con = null;
                HttpURLConnection http = null;
@@ -174,7 +169,7 @@ class CloudComm {
                try {
                        url = new URL(baseurl + "?req=getsalt");
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("getSlot failed");
                }
                try {
@@ -188,21 +183,34 @@ class CloudComm {
                } catch (SocketTimeoutException e) {
                        throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout);
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("getSlot failed");
                }
 
                try {
+
+                       int responsecode = http.getResponseCode();
+                       if (responsecode != HttpURLConnection.HTTP_OK) {
+                               // TODO: Remove this print
+                               // System.out.println(responsecode);
+                               throw new Error("Invalid response");
+                       }
+
                        InputStream is = http.getInputStream();
-                       DataInputStream dis = new DataInputStream(is);
-                       int salt_length = dis.readInt();
-                       byte [] tmp = new byte[salt_length];
-                       dis.readFully(tmp);
-                       salt = tmp;
+                       if (is.available() > 0) {
+                               DataInputStream dis = new DataInputStream(is);
+                               int salt_length = dis.readInt();
+                               byte [] tmp = new byte[salt_length];
+                               dis.readFully(tmp);
+                               salt = tmp;
+                               return true;
+                       } else {
+                               return false;
+                       }
                } catch (SocketTimeoutException e) {
                        throw new ServerException("getSalt failed", ServerException.TypeInputTimeout);
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("getSlot failed");
                }
        }
@@ -212,14 +220,16 @@ class CloudComm {
         * On failure, the server will send slots with newer sequence
         * numbers.
         */
-       Slot[] putSlot(Slot slot, int max) throws ServerException {
+       public Slot[] putSlot(Slot slot, int max) throws ServerException {
                URL url = null;
                URLConnection con = null;
                HttpURLConnection http = null;
 
                try {
                        if (salt == null) {
-                               getSalt();
+                               if (!getSalt()) {
+                                       throw new ServerException("putSlot failed", ServerException.TypeSalt);
+                               }
                                initCrypt();
                        }
 
@@ -243,10 +253,12 @@ class CloudComm {
                        os.flush();
 
                        // System.out.println("Bytes Sent: " + bytes.length);
+               } catch (ServerException e) {
+                       throw e;
                } catch (SocketTimeoutException e) {
                        throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("putSlot failed");
                }
 
@@ -268,7 +280,7 @@ class CloudComm {
                } catch (SocketTimeoutException e) {
                        throw new ServerException("putSlot failed", ServerException.TypeInputTimeout);
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("putSlot failed");
                }
        }
@@ -277,14 +289,16 @@ class CloudComm {
         * Request the server to send all slots with the given
         * sequencenumber or newer.
         */
-       Slot[] getSlots(long sequencenumber) throws ServerException {
+       public Slot[] getSlots(long sequencenumber) throws ServerException {
                URL url = null;
                URLConnection con = null;
                HttpURLConnection http = null;
 
                try {
                        if (salt == null) {
-                               getSalt();
+                               if (!getSalt()) {
+                                       throw new ServerException("getSlots failed", ServerException.TypeSalt);
+                               }
                                initCrypt();
                        }
 
@@ -300,7 +314,7 @@ class CloudComm {
                } catch (ServerException e) {
                        throw e;
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("getSlots failed");
                }
 
@@ -316,7 +330,7 @@ class CloudComm {
                } catch (SocketTimeoutException e) {
                        throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
                } catch (Exception e) {
-                       e.printStackTrace();
+                       // e.printStackTrace();
                        throw new Error("getSlots failed");
                }
        }
@@ -329,19 +343,12 @@ class CloudComm {
                int numberofslots = dis.readInt();
                int[] sizesofslots = new int[numberofslots];
 
-
-               // System.out.println("number of slots: " + numberofslots);
-
-
-
                Slot[] slots = new Slot[numberofslots];
                for (int i = 0; i < numberofslots; i++)
                        sizesofslots[i] = dis.readInt();
 
                for (int i = 0; i < numberofslots; i++) {
 
-                       // System.out.println("Size of slot: " + sizesofslots[i]);
-
                        byte[] data = new byte[sizesofslots[i]];
                        dis.readFully(data);
 
@@ -467,14 +474,16 @@ class CloudComm {
        public void close() {
                doEnd = true;
 
-               try {
-                       localServerThread.join();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       throw new Error("Local Server thread join issue...");
+               if (localServerThread != null) {
+                       try {
+                               localServerThread.join();
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               throw new Error("Local Server thread join issue...");
+                       }
                }
 
-               System.out.println("Done Closing");
+               // System.out.println("Done Closing Cloud Comm");
        }
 
        protected void finalize() throws Throwable {
index b09096c..1705c70 100644 (file)
@@ -4,7 +4,8 @@ public class ServerException extends Exception {
 
     public static final byte TypeConnectTimeout = 1;
     public static final byte TypeInputTimeout = 2;
-    public static final byte TypeIncorrectResponseCode = 2;
+    public static final byte TypeIncorrectResponseCode = 3;
+    public static final byte TypeSalt = 4;
     private byte type = -1;
 
     public ServerException(String message, byte _type) {
index 354747d..c254939 100644 (file)
@@ -228,7 +228,7 @@ final public class Table {
         * also initialize the crypto stuff.
         */
        public synchronized void initTable() throws ServerException {
-               cloud.setSalt(); //Set the salt
+               cloud.initSecurity();
 
                // Create the first insertion into the block chain which is the table status
                Slot s = new Slot(this, 1, localMachineId);
@@ -473,6 +473,8 @@ final public class Table {
                        }
                }
 
+               updateLiveStateFromLocal();
+
                return transactionStatus;
        }
 
@@ -535,11 +537,6 @@ final public class Table {
                                // Try to send to the server
                                ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
 
-                               // if (sendSlotsReturn.getSecond()) {
-                               //      System.out.println("Second was true");
-                               // }
-
-
                                if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
                                        // Did insert into the block chain
 
@@ -560,9 +557,12 @@ final public class Table {
                                                        iter.remove();
                                                }
                                        }
+                                       
+                                       for (Transaction transaction : transactionPartsSent.keySet()) {
 
 
-                                       for (Transaction transaction : transactionPartsSent.keySet()) {
+                                               transaction.resetServerFailure();
+
 
                                                // Update which transactions parts still need to be sent
                                                transaction.removeSentParts(transactionPartsSent.get(transaction));
@@ -583,6 +583,7 @@ final public class Table {
                                        // Reset which transaction to send
                                        for (Transaction transaction : transactionPartsSent.keySet()) {
                                                transaction.resetNextPartToSend();
+                                               transaction.resetNextPartToSend();
 
                                                // Set the transaction sequence number back to nothing
                                                if (!transaction.didSendAPartToServer()) {
@@ -602,11 +603,16 @@ final public class Table {
                        }
                } catch (ServerException e) {
 
+                       System.out.println("Server Failure:   " + e.getType());
+
+
                        if (e.getType() != ServerException.TypeInputTimeout) {
                                // e.printStackTrace();
 
                                // Nothing was able to be sent to the server so just clear these data structures
                                for (Transaction transaction : transactionPartsSent.keySet()) {
+                                       transaction.resetNextPartToSend();
+
                                        // Set the transaction sequence number back to nothing
                                        if (!transaction.didSendAPartToServer()) {
                                                transaction.setSequenceNumber(-1);
@@ -615,6 +621,12 @@ final public class Table {
                        } else {
                                // There was a partial send to the server
                                hadPartialSendToServer = true;
+
+                               // Nothing was able to be sent to the server so just clear these data structures
+                               for (Transaction transaction : transactionPartsSent.keySet()) {
+                                       transaction.resetNextPartToSend();
+                                       transaction.setServerFailure();
+                               }
                        }
 
                        pendingSendArbitrationEntriesToDelete.clear();
@@ -626,6 +638,56 @@ final public class Table {
                return newKey == null;
        }
 
+       public synchronized boolean updateFromLocal(long machineId) {
+               Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
+               if (localCommunicationInformation == null) {
+                       // Cant talk to that device locally so do nothing
+                       return false;
+               }
+
+               // Get the size of the send data
+               int sendDataSize = Integer.BYTES + Long.BYTES;
+
+               Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+               if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
+                       lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
+               }
+
+               byte[] sendData = new byte[sendDataSize];
+               ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+               // Encode the data
+               bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+               bbEncode.putInt(0);
+
+               // Send by local
+               byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+
+               if (returnData == null) {
+                       // Could not contact server
+                       return false;
+               }
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+               int numberOfEntries = bbDecode.getInt();
+
+               for (int i = 0; i < numberOfEntries; i++) {
+                       byte type = bbDecode.get();
+                       if (type == Entry.TypeAbort) {
+                               Abort abort = (Abort)Abort.decode(null, bbDecode);
+                               processEntry(abort);
+                       } else if (type == Entry.TypeCommitPart) {
+                               CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+                               processEntry(commitPart);
+                       }
+               }
+
+               updateLiveStateFromLocal();
+
+               return true;
+       }
+
        private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
 
                // Get the devices local communications
@@ -658,40 +720,9 @@ final public class Table {
                        part.encode(bbEncode);
                }
 
-
-
-
-
-
-
-
-
-
-
-
-               System.out.println("================================");
-               System.out.println("Sending Locally");
-               for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
-                       System.out.println(kv);
-               }
-
-
-
-
-
-
-
-
-
-
-
                // Send by local
                byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
 
-
-               System.out.println("--------------------------------");
-               System.out.println();
-
                if (returnData == null) {
                        // Could not contact server
                        return new Pair<Boolean, Boolean>(true, false);
@@ -730,10 +761,11 @@ final public class Table {
                                status.setStatus(TransactionStatus.StatusAborted);
                        }
                } else {
+                       TransactionStatus status =  transaction.getTransactionStatus();
                        if (foundAbort) {
-                               TransactionStatus status =  transaction.getTransactionStatus();
                                status.setStatus(TransactionStatus.StatusAborted);
-                               return new Pair<Boolean, Boolean>(false, false);
+                       } else {
+                               status.setStatus(TransactionStatus.StatusCommitted);
                        }
                }
 
@@ -803,12 +835,6 @@ final public class Table {
                                        continue;
                                }
 
-                               System.out.println("---");
-                               for (KeyValue kv : commit.getKeyValueUpdateSet()) {
-                                       System.out.println("Sending Commit Locally:  " + kv);
-                               }
-                               System.out.println("---");
-
                                unseenArbitrations.addAll(commit.getParts().values());
 
                                for (CommitPart commitPart : commit.getParts().values()) {
@@ -986,16 +1012,15 @@ final public class Table {
                        }
                }
 
-               // Insert as many transactions as possible while keeping order
-               for (Transaction transaction : pendingTransactionQueue) {
+               if (pendingTransactionQueue.size() > 0) {
+
+                       Transaction transaction = pendingTransactionQueue.get(0);
 
                        // Set the transaction sequence number if it has yet to be inserted into the block chain
-                       if (!transaction.didSendAPartToServer()) {
+                       if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
                                transaction.setSequenceNumber(slot.getSequenceNumber());
                        }
 
-                       boolean ranOutOfSpace = false;
-
                        while (true) {
                                TransactionPart part = transaction.getNextPartToSend();
 
@@ -1006,27 +1031,66 @@ final public class Table {
 
                                if (slot.hasSpace(part)) {
                                        slot.addEntry(part);
-
                                        List<Integer> partsSent = transactionPartsSent.get(transaction);
                                        if (partsSent == null) {
                                                partsSent = new ArrayList<Integer>();
                                                transactionPartsSent.put(transaction, partsSent);
                                        }
-
                                        partsSent.add(part.getPartNumber());
                                        transactionPartsSent.put(transaction, partsSent);
-
                                } else {
-                                       ranOutOfSpace = true;
                                        break;
                                }
                        }
-
-                       if (ranOutOfSpace) {
-                               break;
-                       }
                }
 
+
+               // // Insert as many transactions as possible while keeping order
+               // for (Transaction transaction : pendingTransactionQueue) {
+
+               //      // Set the transaction sequence number if it has yet to be inserted into the block chain
+               //      if (!transaction.didSendAPartToServer()) {
+               //              transaction.setSequenceNumber(slot.getSequenceNumber());
+               //      }
+
+               //      boolean ranOutOfSpace = false;
+
+               //      while (true) {
+               //              TransactionPart part = transaction.getNextPartToSend();
+
+               //              if (part == null) {
+               //                      // Ran out of parts to send for this transaction so move on
+               //                      break;
+               //              }
+
+               //              if (slot.hasSpace(part)) {
+               //                      slot.addEntry(part);
+               //                      List<Integer> partsSent = transactionPartsSent.get(transaction);
+               //                      if (partsSent == null) {
+               //                              partsSent = new ArrayList<Integer>();
+               //                              transactionPartsSent.put(transaction, partsSent);
+               //                      }
+               //                      partsSent.add(part.getPartNumber());
+               //                      transactionPartsSent.put(transaction, partsSent);
+
+               //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+               //                              System.out.println("Inserted Into Slot: " + kv);
+               //                      }
+
+               //                      ranOutOfSpace = true;
+               //                      break;
+
+               //              } else {
+               //                      ranOutOfSpace = true;
+               //                      break;
+               //              }
+               //      }
+
+               //      if (ranOutOfSpace) {
+               //              break;
+               //      }
+               // }
+
                // Fill the remainder of the slot with rescue data
                doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
 
@@ -1342,6 +1406,12 @@ final public class Table {
 
                                // Add that part to the transaction
                                transaction.addPartDecode(part);
+
+                               if (transaction.isComplete()) {
+                                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                               System.out.println("Got Live Transaction   " + kv + "    " + part.getSequenceNumber());
+                                       }
+                               }
                        }
                }
 
@@ -1397,6 +1467,11 @@ final public class Table {
                                }
                        }
 
+
+                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                               System.out.println("Arbitrating on:     " + kv);
+                       }
+
                        if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
                                // Guard evaluated as true
 
@@ -1407,7 +1482,6 @@ final public class Table {
 
                                // Update what the last transaction committed was for use in batch commit
                                lastTransactionCommitted = transaction.getSequenceNumber();
-
                        } else {
                                // Guard evaluated was false so create abort
 
@@ -1424,6 +1498,10 @@ final public class Table {
 
                                // Insert the abort so we can process
                                processEntry(newAbort);
+
+                               for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                       System.out.println("Abort From Server!!!!!!     " + kv);
+                               }
                        }
                }
 
@@ -1452,7 +1530,6 @@ final public class Table {
                        }
                }
 
-
                if ((newCommit != null) || (generatedAborts.size() > 0)) {
                        ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
                        pendingSendArbitrationRounds.add(arbitrationRound);
@@ -1484,8 +1561,6 @@ final public class Table {
                        if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
                                if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
                                        // We've have already seen this from the server
-
-                                       System.out.println("Local Arbitrate Seen Already from server, rejected");
                                        return new Pair<Boolean, Boolean>(false, false);
                                }
                        }
@@ -1580,7 +1655,6 @@ final public class Table {
         */
        private boolean compactArbitrationData() {
 
-
                if (pendingSendArbitrationRounds.size() < 2) {
                        // Nothing to compact so do nothing
                        return false;
@@ -1663,6 +1737,9 @@ final public class Table {
 
                return false;
        }
+       // private boolean compactArbitrationData() {
+       //      return false;
+       // }
 
        /**
         * Update all the commits and the committed tables, sets dead the dead transactions
@@ -1827,15 +1904,11 @@ final public class Table {
 
 
 
-                               System.out.println("============");
                                // Update the committed table of keys and which commit is using which key
                                for (KeyValue kv : commit.getKeyValueUpdateSet()) {
-                                       System.out.println("Committing:  " + kv);
                                        committedKeyValueTable.put(kv.getKey(), kv);
                                        liveCommitsByKeyTable.put(kv.getKey(), commit);
                                }
-                               System.out.println("--------------");
-                               System.out.println();
                        }
                }
 
@@ -2160,6 +2233,15 @@ final public class Table {
         */
        private void processEntry(Abort entry) {
 
+
+               if (entry.getTransactionSequenceNumber() != -1) {
+                       // update the transaction status if it was sent to the server
+                       TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
+                       if (status != null) {
+                               status.setStatus(TransactionStatus.StatusAborted);
+                       }
+               }
+
                // Abort has not been seen by the client it is for yet so we need to keep track of it
                Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
                if (previouslySeenAbort != null) {
@@ -2184,13 +2266,7 @@ final public class Table {
                }
 
 
-               if (entry.getTransactionSequenceNumber() != -1) {
-                       // update the transaction status if it was sent to the server
-                       TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
-                       if (status != null) {
-                               status.setStatus(TransactionStatus.StatusAborted);
-                       }
-               }
+
 
                // Update the last arbitration data that we have seen so far
                if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
@@ -2200,7 +2276,6 @@ final public class Table {
                                // Is larger
                                lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
                        }
-
                } else {
                        // Never seen any data from this arbitrator so record the first one
                        lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
index 9e80def..96d505c 100644 (file)
@@ -11,7 +11,7 @@ import java.util.ArrayList;
 
 public class Test {
 
-    public static final  int NUMBER_OF_TESTS = 100;
+    public static final  int NUMBER_OF_TESTS = 1;
 
     public static void main(String[] args)  throws ServerException {
         if (args[0].equals("2")) {
@@ -28,9 +28,621 @@ public class Test {
             test7();
         } else if (args[0].equals("8")) {
             test8();
+        } else if (args[0].equals("9")) {
+            test9();
+        } else if (args[0].equals("10")) {
+            test10();
+        } else if (args[0].equals("11")) {
+            test11();
         }
     }
 
+
+    static void test11() {
+
+        boolean foundError = false;
+        List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+        // Setup the 2 clients
+        Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, 6000);
+
+        System.out.println("Init Table t1s");
+
+        while (true) {
+            try {
+                t1.initTable();
+                break;
+            } catch (Exception e) { }
+        }
+
+
+        System.out.println("Update Table t2");
+        Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, 6001);
+        while (t2.update() == false) {}
+
+        // Make the Keys
+        System.out.println("Setting up keys");
+        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+            System.out.println(i);
+
+            String a = "a" + i;
+            String b = "b" + i;
+            String c = "c" + i;
+            String d = "d" + i;
+            IoTString ia = new IoTString(a);
+            IoTString ib = new IoTString(b);
+            IoTString ic = new IoTString(c);
+            IoTString id = new IoTString(d);
+
+            while (true) {
+                try {
+                    t1.createNewKey(ia, 321);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t1.createNewKey(ib, 351);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t2.createNewKey(ic, 321);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t2.createNewKey(id, 351);
+                    break;
+                } catch (Exception e) { }
+            }
+        }
+
+        // Do Updates for the keys
+        System.out.println("Setting Key-Values...");
+        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+            System.out.println(i);
+            String keyA = "a" + i;
+            String keyB = "b" + i;
+            String keyC = "c" + i;
+            String keyD = "d" + i;
+            String valueA = "a" + i;
+            String valueB = "b" + i;
+            String valueC = "c" + i;
+            String valueD = "d" + i;
+
+            IoTString iKeyA = new IoTString(keyA);
+            IoTString iKeyB = new IoTString(keyB);
+            IoTString iKeyC = new IoTString(keyC);
+            IoTString iKeyD = new IoTString(keyD);
+            IoTString iValueA = new IoTString(valueA);
+            IoTString iValueB = new IoTString(valueB);
+            IoTString iValueC = new IoTString(valueC);
+            IoTString iValueD = new IoTString(valueD);
+
+
+            String keyAPrev = "a" + (i - 1);
+            String keyBPrev = "b" + (i - 1);
+            String keyCPrev = "c" + (i - 1);
+            String keyDPrev = "d" + (i - 1);
+            String valueAPrev = "a" + (i - 1);
+            String valueBPrev = "b" + (i - 1);
+            String valueCPrev = "c" + (i - 1);
+            String valueDPrev = "d" + (i - 1);
+
+            IoTString iKeyAPrev = new IoTString(keyAPrev);
+            IoTString iKeyBPrev = new IoTString(keyBPrev);
+            IoTString iKeyCPrev = new IoTString(keyCPrev);
+            IoTString iKeyDPrev = new IoTString(keyDPrev);
+            IoTString iValueAPrev = new IoTString(valueAPrev);
+            IoTString iValueBPrev = new IoTString(valueBPrev);
+            IoTString iValueCPrev = new IoTString(valueCPrev);
+            IoTString iValueDPrev = new IoTString(valueDPrev);
+
+            t1.startTransaction();
+            t1.addKV(iKeyA, iValueA);
+            transStatusList.add(t1.commitTransaction());
+
+            t1.startTransaction();
+            t1.addKV(iKeyB, iValueB);
+            transStatusList.add(t1.commitTransaction());
+
+            t2.startTransaction();
+            t2.addKV(iKeyC, iValueC);
+            transStatusList.add(t2.commitTransaction());
+
+            t2.startTransaction();
+            t2.addKV(iKeyD, iValueD);
+            transStatusList.add(t2.commitTransaction());
+        }
+
+        System.out.println("Updating...");
+        while (t1.update() == false) {}
+        while (t2.update() == false) {}
+        while (t1.update() == false) {}
+        while (t2.update() == false) {}
+
+        System.out.println("Checking Key-Values...");
+        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+            String keyA = "a" + i;
+            String keyB = "b" + i;
+            String keyC = "c" + i;
+            String keyD = "d" + i;
+            String valueA = "a" + i;
+            String valueB = "b" + i;
+            String valueC = "c" + i;
+            String valueD = "d" + i;
+
+            IoTString iKeyA = new IoTString(keyA);
+            IoTString iKeyB = new IoTString(keyB);
+            IoTString iKeyC = new IoTString(keyC);
+            IoTString iKeyD = new IoTString(keyD);
+            IoTString iValueA = new IoTString(valueA);
+            IoTString iValueB = new IoTString(valueB);
+            IoTString iValueC = new IoTString(valueC);
+            IoTString iValueD = new IoTString(valueD);
+
+
+            IoTString testValA1 = t1.getCommitted(iKeyA);
+            IoTString testValB1 = t1.getCommitted(iKeyB);
+            IoTString testValC1 = t1.getCommitted(iKeyC);
+            IoTString testValD1 = t1.getCommitted(iKeyD);
+
+            IoTString testValA2 = t2.getCommitted(iKeyA);
+            IoTString testValB2 = t2.getCommitted(iKeyB);
+            IoTString testValC2 = t2.getCommitted(iKeyC);
+            IoTString testValD2 = t2.getCommitted(iKeyD);
+
+            if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyA + "    " + testValA1);
+                foundError = true;
+            }
+
+            if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyB + "    " + testValB1);
+                foundError = true;
+            }
+
+            if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyC + "    " + testValC1);
+                foundError = true;
+            }
+
+            if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyD + "    " + testValD1);
+                foundError = true;
+            }
+
+
+            if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
+                foundError = true;
+            }
+
+            if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
+                foundError = true;
+            }
+
+            if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
+                foundError = true;
+            }
+
+            if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
+                foundError = true;
+            }
+        }
+
+        for (TransactionStatus status : transStatusList) {
+            if (status.getStatus() != TransactionStatus.StatusCommitted) {
+                foundError = true;
+            }
+        }
+
+        if (foundError) {
+            System.out.println("Found Errors...");
+        } else {
+            System.out.println("No Errors Found...");
+        }
+
+        t1.close();
+        t2.close();
+    }
+
+    static void test10() {
+
+        boolean foundError = false;
+        List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+        // Setup the 2 clients
+        Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
+        System.out.println("Init Table t1s");
+        while (true) {
+            try {
+                t1.initTable();
+                break;
+            } catch (Exception e) { }
+        }
+        Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
+        while (t2.update() == false) {}
+
+        // t1.addLocalCommunication(351, "127.0.0.1", 6001);
+        // t2.addLocalCommunication(321, "127.0.0.1", 6000);
+
+
+        // Make the Keys
+        System.out.println("Setting up keys");
+        for (int i = 0; i < 4; i++) {
+            String a = "a" + i;
+            String b = "b" + i;
+            String c = "c" + i;
+            String d = "d" + i;
+            IoTString ia = new IoTString(a);
+            IoTString ib = new IoTString(b);
+            IoTString ic = new IoTString(c);
+            IoTString id = new IoTString(d);
+            while (true) {
+                try {
+                    t1.createNewKey(ia, 321);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t1.createNewKey(ib, 351);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t2.createNewKey(ic, 321);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t2.createNewKey(id, 351);
+                    break;
+                } catch (Exception e) { }
+            }
+        }
+
+
+        // Do Updates for the keys
+        System.out.println("B========================");
+        for (int t = 0; t < NUMBER_OF_TESTS; t++) {
+            for (int i = 0; i < 4; i++) {
+
+                System.out.println(i);
+
+                String keyB = "b" + i;
+                String valueB = "b" + (i + t);
+
+                IoTString iKeyB = new IoTString(keyB);
+                IoTString iValueB = new IoTString(valueB);
+
+                t1.startTransaction();
+                System.out.println(t1.getSpeculativeAtomic(iKeyB));
+                t1.addKV(iKeyB, iValueB);
+                transStatusList.add(t1.commitTransaction());
+            }
+        }
+        System.out.println();
+
+
+        System.out.println("C========================");
+        for (int t = 0; t < NUMBER_OF_TESTS; t++) {
+            for (int i = 0; i < 4; i++) {
+                System.out.println(i);
+
+                String keyC = "c" + i;
+                String valueC = "c" + (i + t);
+
+                IoTString iKeyC = new IoTString(keyC);
+                IoTString iValueC = new IoTString(valueC);
+
+                t2.startTransaction();
+                System.out.println(t2.getSpeculativeAtomic(iKeyC));
+                t2.addKV(iKeyC, iValueC);
+                transStatusList.add(t2.commitTransaction());
+            }
+        }
+        System.out.println();
+
+
+        for (int t = 0; t < NUMBER_OF_TESTS; t++) {
+            for (int i = 0; i < 4; i++) {
+                String keyA = "a" + i;
+                String keyD = "d" + i;
+                String valueA = "a" + (i + t);
+                String valueD = "d" + (i + t);
+
+                IoTString iKeyA = new IoTString(keyA);
+                IoTString iKeyD = new IoTString(keyD);
+                IoTString iValueA = new IoTString(valueA);
+                IoTString iValueD = new IoTString(valueD);
+
+
+                t1.startTransaction();
+                t1.addKV(iKeyA, iValueA);
+                transStatusList.add(t1.commitTransaction());
+
+
+                t2.startTransaction();
+                t2.addKV(iKeyD, iValueD);
+                transStatusList.add(t2.commitTransaction());
+                System.out.println();
+            }
+        }
+        System.out.println();
+
+        System.out.println("Updating...");
+        System.out.println("t1 -=-=-=-=-=-=-=-");
+        while (t1.update() == false) {}
+        System.out.println("t2 -=-=-=-=-=-=-=-");
+        while (t2.update() == false) {}
+        System.out.println("t1 -=-=-=-=-=-=-=-");
+        while (t1.update() == false) {}
+        System.out.println("t2 -=-=-=-=-=-=-=-");
+        while (t2.update() == false) {}
+
+
+        System.out.println("Checking Key-Values...");
+        for (int i = 0; i < 4; i++) {
+
+            String keyA = "a" + i;
+            String keyB = "b" + i;
+            String keyC = "c" + i;
+            String keyD = "d" + i;
+            String valueA = "a" + (i + NUMBER_OF_TESTS - 1);
+            String valueB = "b" + (i + NUMBER_OF_TESTS - 1);
+            String valueC = "c" + (i + NUMBER_OF_TESTS - 1);
+            String valueD = "d" + (i + NUMBER_OF_TESTS - 1);
+
+            IoTString iKeyA = new IoTString(keyA);
+            IoTString iKeyB = new IoTString(keyB);
+            IoTString iKeyC = new IoTString(keyC);
+            IoTString iKeyD = new IoTString(keyD);
+            IoTString iValueA = new IoTString(valueA);
+            IoTString iValueB = new IoTString(valueB);
+            IoTString iValueC = new IoTString(valueC);
+            IoTString iValueD = new IoTString(valueD);
+
+
+            IoTString testValA1 = t1.getCommitted(iKeyA);
+            IoTString testValB1 = t1.getCommitted(iKeyB);
+            IoTString testValC1 = t1.getCommitted(iKeyC);
+            IoTString testValD1 = t1.getCommitted(iKeyD);
+
+            IoTString testValA2 = t2.getCommitted(iKeyA);
+            IoTString testValB2 = t2.getCommitted(iKeyB);
+            IoTString testValC2 = t2.getCommitted(iKeyC);
+            IoTString testValD2 = t2.getCommitted(iKeyD);
+
+            if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyA + "    " + testValA1);
+                foundError = true;
+            }
+
+            if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyB + "    " + testValB1);
+                foundError = true;
+            }
+
+            if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyC + "    " + testValC1);
+                foundError = true;
+            }
+
+            if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyD + "    " + testValD1);
+                foundError = true;
+            }
+
+            if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
+                foundError = true;
+            }
+
+            if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyB + "    " + testValB2);
+                foundError = true;
+            }
+
+            if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyC + "    " + testValC2);
+                foundError = true;
+            }
+
+            if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
+                foundError = true;
+            }
+        }
+
+        int counter = 0;
+        for (TransactionStatus status : transStatusList) {
+            if (status.getStatus() != TransactionStatus.StatusCommitted) {
+                foundError = true;
+                System.out.println(counter + "    Status: " + status.getStatus());
+            }
+            counter++;
+        }
+
+        if (foundError) {
+            System.out.println("Found Errors...");
+        } else {
+            System.out.println("No Errors Found...");
+        }
+
+        t1.close();
+        t2.close();
+    }
+
+    static void test9() {
+
+        boolean foundError = false;
+        List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+        // Setup the 2 clients
+        Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, 6000);
+
+        System.out.println("Init Table t1s");
+        while (true) {
+            try {
+                t1.initTable();
+                break;
+            } catch (Exception e) { }
+        }
+
+
+        System.out.println("Update Table t2");
+        Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, 6001);
+        while (t2.update() == false) {}
+
+        t1.addLocalCommunication(351, "127.0.0.1", 6001);
+        t2.addLocalCommunication(321, "127.0.0.1", 6000);
+
+        // Make the Keys
+        System.out.println("Setting up keys");
+        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+            System.out.println(i);
+
+            String a = "a" + i;
+            String b = "b" + i;
+            String c = "c" + i;
+            String d = "d" + i;
+            IoTString ia = new IoTString(a);
+            IoTString ib = new IoTString(b);
+            IoTString ic = new IoTString(c);
+            IoTString id = new IoTString(d);
+
+            while (true) {
+                try {
+                    t1.createNewKey(ia, 321);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t1.createNewKey(ib, 351);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t2.createNewKey(ic, 321);
+                    break;
+                } catch (Exception e) { }
+            }
+
+            while (true) {
+                try {
+                    t2.createNewKey(id, 351);
+                    break;
+                } catch (Exception e) { }
+            }
+        }
+
+        System.out.println("A, D...");
+        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+            String keyA = "a" + i;
+            String keyD = "d" + i;
+            String valueA = "a" + i;
+            String valueD = "d" + i;
+
+            IoTString iKeyA = new IoTString(keyA);
+            IoTString iKeyD = new IoTString(keyD);
+            IoTString iValueA = new IoTString(valueA);
+            IoTString iValueD = new IoTString(valueD);
+
+            t1.startTransaction();
+            t1.addKV(iKeyA, iValueA);
+            transStatusList.add(t1.commitTransaction());
+
+            t2.startTransaction();
+            t2.addKV(iKeyD, iValueD);
+            transStatusList.add(t2.commitTransaction());
+        }
+
+        while (t1.updateFromLocal(351) == false) {}
+        while (t2.updateFromLocal(321) == false) {}
+
+
+        System.out.println("Updating...");
+        System.out.println("Checking Key-Values...");
+
+        for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+            String keyA = "a" + i;
+            String keyD = "d" + i;
+            String valueA = "a" + i;
+            String valueD = "d" + i;
+
+
+            IoTString iKeyA = new IoTString(keyA);
+            IoTString iKeyD = new IoTString(keyD);
+            IoTString iValueA = new IoTString(valueA);
+            IoTString iValueD = new IoTString(valueD);
+
+
+            IoTString testValA1 = t1.getCommitted(iKeyA);
+            IoTString testValD1 = t1.getCommitted(iKeyD);
+            IoTString testValA2 = t2.getCommitted(iKeyA);
+            IoTString testValD2 = t2.getCommitted(iKeyD);
+
+
+            if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyA + "    " + testValA1);
+                foundError = true;
+            }
+
+            if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+                System.out.println("Key-Value t1 incorrect: " + keyD + "    " + testValD1);
+                foundError = true;
+            }
+
+
+            if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyA + "    " + testValA2);
+                foundError = true;
+            }
+
+            if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+                System.out.println("Key-Value t2 incorrect: " + keyD + "    " + testValD2);
+                foundError = true;
+            }
+        }
+
+        for (TransactionStatus status : transStatusList) {
+            if (status.getStatus() != TransactionStatus.StatusCommitted) {
+                foundError = true;
+            }
+        }
+
+        if (foundError) {
+            System.out.println("Found Errors...");
+        } else {
+            System.out.println("No Errors Found...");
+        }
+
+        t1.close();
+        t2.close();
+    }
+
     static void test8() {
 
         boolean foundError = false;
@@ -43,10 +655,9 @@ public class Test {
 
         while (true) {
             try {
-                System.out.println("-==-=-=-=-=-=-=-==-=-");
                 t1.initTable();
                 break;
-            } catch (Exception e) {}
+            } catch (Exception e) { }
         }
 
 
index b3a0490..f444ded 100644 (file)
@@ -28,6 +28,8 @@ class Transaction {
 
     private TransactionStatus transactionStatus = null;
 
+    private boolean hadServerFailure = false;
+
     public Transaction() {
         parts = new HashMap<Integer, TransactionPart>();
         keyValueGuardSet = new HashSet<KeyValue>();
@@ -142,6 +144,21 @@ class Transaction {
         return part;
     }
 
+
+    public void setServerFailure() {
+        hadServerFailure = true;
+    }
+
+    public boolean getServerFailure() {
+        return hadServerFailure;
+    }
+
+
+    public void resetServerFailure() {
+        hadServerFailure = false;
+    }
+
+
     public void setTransactionStatus(TransactionStatus _transactionStatus) {
         transactionStatus = _transactionStatus;
     }
@@ -268,10 +285,19 @@ class Transaction {
 
             if (kvGuard.getValue() != null) {
                 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
+
+
+                    if (kv != null) {
+                        System.out.println(kvGuard.getValue() + "       " + kv.getValue());
+                    } else {
+                        System.out.println(kvGuard.getValue() + "       " + kv);
+                    }
+
                     return false;
                 }
             } else {
                 if (kv != null) {
+                    System.out.println("kvGuard was nulled:  " + kv);
                     return false;
                 }
             }
index 64639b3..1e046a3 100644 (file)
@@ -208,35 +208,19 @@ void IoTQuery::getSlot() {
 /**
  * The method setSalt handles a setSalt request from the client.
  */
-
 void IoTQuery::setSalt() {
        /* Write the slot data we received to a SLOT file */
        char *filename = getSaltFileName();
-       char * response = new char[1];
-
-       if (access(filename, F_OK) == 0)
-       {
-               /* Already Exists */
-               response[0] = 1;
-       }
-       else
-       {
-               /* Does not exist so create it */
-               int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
-               doWrite(saltfd, data, length);
-               close(saltfd);
-               response[0] = 0;
-       }
-
-
-       sendResponse(response, 1);
-
+       int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+       doWrite(saltfd, data, length);
+       char response[0];
+       sendResponse(response, 0);
+       close(saltfd);
        delete filename;
-       delete response;
 }
 
 /**
- * The method getSalt handles a setSalt request from the client.
+ * The method getSalt handles a getSalt request from the client.
  */
 
 void IoTQuery::getSalt() {
@@ -247,6 +231,8 @@ void IoTQuery::getSalt() {
        if (stat(filename, &st) == 0) {
                filesize = st.st_size;
        } else {
+               char response[0];
+               sendResponse(response, 0);
                delete filename;
                return;
        }
@@ -306,6 +292,7 @@ void IoTQuery::sendResponse(char * bytes, int len) {
             << "Content-Length: " << len << "\r\n"
             << "\r\n";
        cout.write(bytes, len);
+       cout << flush;
 }
 
 /**
@@ -359,8 +346,10 @@ void IoTQuery::removeOldestSlot() {
 void IoTQuery::processQuery() {
        getQuery();
        getDirectory();
+       // readData();
        if (!readData())
        {
+               cerr << "No Data Available" << endl;
                return;
        }
 
@@ -412,19 +401,20 @@ void IoTQuery::processQuery() {
  */
 
 bool IoTQuery::readData() {
-       if (length) {
+       if (length != 0) {
                data = new char[length + 1];
                memset(data, 0, length + 1);
                cin.read(data, length);
        }
+
        do {
                char dummy;
                cin >> dummy;
        } while (!cin.eof());
 
-       if (length)
+       if (length != 0)
        {
-               if (cin.fail())
+               if (cin.gcount() != length)
                {
                        return false;
                }