From: Ali Younis Date: Sun, 15 Jan 2017 21:04:22 +0000 (-0800) Subject: Local communication working X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=5a137c5e6e26460af285db43a3bd7b088a9024f5 Local communication working --- diff --git a/version2/src/java/iotcloud/CloudComm.java b/version2/src/java/iotcloud/CloudComm.java index 5741019..b8c296b 100644 --- a/version2/src/java/iotcloud/CloudComm.java +++ b/version2/src/java/iotcloud/CloudComm.java @@ -74,6 +74,19 @@ class CloudComm { } } + /** + * Inits all the security stuff + */ + public void initSecurity() throws ServerException { + // try to get the salt and if one does not exist set one + if (!getSalt()) { + //Set the salt + setSalt(); + } + + initCrypt(); + } + /** * Inits the HMAC generator. */ @@ -109,30 +122,31 @@ class CloudComm { return new URL(urlstr); } - public void setSalt() throws ServerException { + private void setSalt() throws ServerException { if (salt != null) { // Salt already sent to server so dont set it again return; } - byte[] saltTmp = new byte[SALT_SIZE]; - random.nextBytes(saltTmp); - - URL url = null; - URLConnection con = null; - HttpURLConnection http = null; try { - url = new URL(baseurl + "?req=setsalt"); - con = url.openConnection(); - http = (HttpURLConnection) con; + byte[] saltTmp = new byte[SALT_SIZE]; + random.nextBytes(saltTmp); + + URL url = new URL(baseurl + "?req=setsalt"); + URLConnection con = url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); http.setFixedLengthStreamingMode(saltTmp.length); http.setDoOutput(true); http.setConnectTimeout(TIMEOUT_MILLIS); http.connect(); + OutputStream os = http.getOutputStream(); os.write(saltTmp); + os.flush(); + int responsecode = http.getResponseCode(); if (responsecode != HttpURLConnection.HTTP_OK) { // TODO: Remove this print @@ -140,33 +154,14 @@ class CloudComm { throw new Error("Invalid response"); } + salt = saltTmp; } catch (Exception e) { + // e.printStackTrace(); throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout); } - - - try { - InputStream is = http.getInputStream(); - DataInputStream dis = new DataInputStream(is); - // byte [] tmp = new byte[1]; - byte tmp = dis.readByte(); - - if (tmp == 0) { - salt = saltTmp; - initCrypt(); - } else { - getSalt(); // there was already a salt so we need to get it - } - - } catch (SocketTimeoutException e) { - throw new ServerException("setSalt failed", ServerException.TypeInputTimeout); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("setSlot failed"); - } } - private void getSalt() throws ServerException { + private boolean getSalt() throws ServerException { URL url = null; URLConnection con = null; HttpURLConnection http = null; @@ -174,7 +169,7 @@ class CloudComm { try { url = new URL(baseurl + "?req=getsalt"); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("getSlot failed"); } try { @@ -188,21 +183,34 @@ class CloudComm { } catch (SocketTimeoutException e) { throw new ServerException("getSalt failed", ServerException.TypeConnectTimeout); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("getSlot failed"); } try { + + int responsecode = http.getResponseCode(); + if (responsecode != HttpURLConnection.HTTP_OK) { + // TODO: Remove this print + // System.out.println(responsecode); + throw new Error("Invalid response"); + } + InputStream is = http.getInputStream(); - DataInputStream dis = new DataInputStream(is); - int salt_length = dis.readInt(); - byte [] tmp = new byte[salt_length]; - dis.readFully(tmp); - salt = tmp; + if (is.available() > 0) { + DataInputStream dis = new DataInputStream(is); + int salt_length = dis.readInt(); + byte [] tmp = new byte[salt_length]; + dis.readFully(tmp); + salt = tmp; + return true; + } else { + return false; + } } catch (SocketTimeoutException e) { throw new ServerException("getSalt failed", ServerException.TypeInputTimeout); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("getSlot failed"); } } @@ -212,14 +220,16 @@ class CloudComm { * On failure, the server will send slots with newer sequence * numbers. */ - Slot[] putSlot(Slot slot, int max) throws ServerException { + public Slot[] putSlot(Slot slot, int max) throws ServerException { URL url = null; URLConnection con = null; HttpURLConnection http = null; try { if (salt == null) { - getSalt(); + if (!getSalt()) { + throw new ServerException("putSlot failed", ServerException.TypeSalt); + } initCrypt(); } @@ -243,10 +253,12 @@ class CloudComm { os.flush(); // System.out.println("Bytes Sent: " + bytes.length); + } catch (ServerException e) { + throw e; } catch (SocketTimeoutException e) { throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("putSlot failed"); } @@ -268,7 +280,7 @@ class CloudComm { } catch (SocketTimeoutException e) { throw new ServerException("putSlot failed", ServerException.TypeInputTimeout); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("putSlot failed"); } } @@ -277,14 +289,16 @@ class CloudComm { * Request the server to send all slots with the given * sequencenumber or newer. */ - Slot[] getSlots(long sequencenumber) throws ServerException { + public Slot[] getSlots(long sequencenumber) throws ServerException { URL url = null; URLConnection con = null; HttpURLConnection http = null; try { if (salt == null) { - getSalt(); + if (!getSalt()) { + throw new ServerException("getSlots failed", ServerException.TypeSalt); + } initCrypt(); } @@ -300,7 +314,7 @@ class CloudComm { } catch (ServerException e) { throw e; } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("getSlots failed"); } @@ -316,7 +330,7 @@ class CloudComm { } catch (SocketTimeoutException e) { throw new ServerException("getSlots failed", ServerException.TypeInputTimeout); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); throw new Error("getSlots failed"); } } @@ -329,19 +343,12 @@ class CloudComm { int numberofslots = dis.readInt(); int[] sizesofslots = new int[numberofslots]; - - // System.out.println("number of slots: " + numberofslots); - - - Slot[] slots = new Slot[numberofslots]; for (int i = 0; i < numberofslots; i++) sizesofslots[i] = dis.readInt(); for (int i = 0; i < numberofslots; i++) { - // System.out.println("Size of slot: " + sizesofslots[i]); - byte[] data = new byte[sizesofslots[i]]; dis.readFully(data); @@ -467,14 +474,16 @@ class CloudComm { public void close() { doEnd = true; - try { - localServerThread.join(); - } catch (Exception e) { - e.printStackTrace(); - throw new Error("Local Server thread join issue..."); + if (localServerThread != null) { + try { + localServerThread.join(); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local Server thread join issue..."); + } } - System.out.println("Done Closing"); + // System.out.println("Done Closing Cloud Comm"); } protected void finalize() throws Throwable { diff --git a/version2/src/java/iotcloud/ServerException.java b/version2/src/java/iotcloud/ServerException.java index b09096c..1705c70 100644 --- a/version2/src/java/iotcloud/ServerException.java +++ b/version2/src/java/iotcloud/ServerException.java @@ -4,7 +4,8 @@ public class ServerException extends Exception { public static final byte TypeConnectTimeout = 1; public static final byte TypeInputTimeout = 2; - public static final byte TypeIncorrectResponseCode = 2; + public static final byte TypeIncorrectResponseCode = 3; + public static final byte TypeSalt = 4; private byte type = -1; public ServerException(String message, byte _type) { diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 354747d..c254939 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -228,7 +228,7 @@ final public class Table { * also initialize the crypto stuff. */ public synchronized void initTable() throws ServerException { - cloud.setSalt(); //Set the salt + cloud.initSecurity(); // Create the first insertion into the block chain which is the table status Slot s = new Slot(this, 1, localMachineId); @@ -473,6 +473,8 @@ final public class Table { } } + updateLiveStateFromLocal(); + return transactionStatus; } @@ -535,11 +537,6 @@ final public class Table { // Try to send to the server ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null); - // if (sendSlotsReturn.getSecond()) { - // System.out.println("Second was true"); - // } - - if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) { // Did insert into the block chain @@ -560,9 +557,12 @@ final public class Table { iter.remove(); } } + + for (Transaction transaction : transactionPartsSent.keySet()) { - for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetServerFailure(); + // Update which transactions parts still need to be sent transaction.removeSentParts(transactionPartsSent.get(transaction)); @@ -583,6 +583,7 @@ final public class Table { // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); + transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer()) { @@ -602,11 +603,16 @@ final public class Table { } } catch (ServerException e) { + System.out.println("Server Failure: " + e.getType()); + + if (e.getType() != ServerException.TypeInputTimeout) { // e.printStackTrace(); // Nothing was able to be sent to the server so just clear these data structures for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetNextPartToSend(); + // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer()) { transaction.setSequenceNumber(-1); @@ -615,6 +621,12 @@ final public class Table { } else { // There was a partial send to the server hadPartialSendToServer = true; + + // Nothing was able to be sent to the server so just clear these data structures + for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetNextPartToSend(); + transaction.setServerFailure(); + } } pendingSendArbitrationEntriesToDelete.clear(); @@ -626,6 +638,56 @@ final public class Table { return newKey == null; } + public synchronized boolean updateFromLocal(long machineId) { + Pair localCommunicationInformation = localCommunicationTable.get(machineId); + if (localCommunicationInformation == null) { + // Cant talk to that device locally so do nothing + return false; + } + + // Get the size of the send data + int sendDataSize = Integer.BYTES + Long.BYTES; + + Long lastArbitrationDataLocalSequenceNumber = (long) - 1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); + } + + byte[] sendData = new byte[sendDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + + // Encode the data + bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode.putInt(0); + + // Send by local + byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + + if (returnData == null) { + // Could not contact server + return false; + } + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(returnData); + int numberOfEntries = bbDecode.getInt(); + + for (int i = 0; i < numberOfEntries; i++) { + byte type = bbDecode.get(); + if (type == Entry.TypeAbort) { + Abort abort = (Abort)Abort.decode(null, bbDecode); + processEntry(abort); + } else if (type == Entry.TypeCommitPart) { + CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode); + processEntry(commitPart); + } + } + + updateLiveStateFromLocal(); + + return true; + } + private Pair sendTransactionToLocal(Transaction transaction) { // Get the devices local communications @@ -658,40 +720,9 @@ final public class Table { part.encode(bbEncode); } - - - - - - - - - - - - System.out.println("================================"); - System.out.println("Sending Locally"); - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - System.out.println(kv); - } - - - - - - - - - - - // Send by local byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); - - System.out.println("--------------------------------"); - System.out.println(); - if (returnData == null) { // Could not contact server return new Pair(true, false); @@ -730,10 +761,11 @@ final public class Table { status.setStatus(TransactionStatus.StatusAborted); } } else { + TransactionStatus status = transaction.getTransactionStatus(); if (foundAbort) { - TransactionStatus status = transaction.getTransactionStatus(); status.setStatus(TransactionStatus.StatusAborted); - return new Pair(false, false); + } else { + status.setStatus(TransactionStatus.StatusCommitted); } } @@ -803,12 +835,6 @@ final public class Table { continue; } - System.out.println("---"); - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - System.out.println("Sending Commit Locally: " + kv); - } - System.out.println("---"); - unseenArbitrations.addAll(commit.getParts().values()); for (CommitPart commitPart : commit.getParts().values()) { @@ -986,16 +1012,15 @@ final public class Table { } } - // Insert as many transactions as possible while keeping order - for (Transaction transaction : pendingTransactionQueue) { + if (pendingTransactionQueue.size() > 0) { + + Transaction transaction = pendingTransactionQueue.get(0); // Set the transaction sequence number if it has yet to be inserted into the block chain - if (!transaction.didSendAPartToServer()) { + if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { transaction.setSequenceNumber(slot.getSequenceNumber()); } - boolean ranOutOfSpace = false; - while (true) { TransactionPart part = transaction.getNextPartToSend(); @@ -1006,27 +1031,66 @@ final public class Table { if (slot.hasSpace(part)) { slot.addEntry(part); - List partsSent = transactionPartsSent.get(transaction); if (partsSent == null) { partsSent = new ArrayList(); transactionPartsSent.put(transaction, partsSent); } - partsSent.add(part.getPartNumber()); transactionPartsSent.put(transaction, partsSent); - } else { - ranOutOfSpace = true; break; } } - - if (ranOutOfSpace) { - break; - } } + + // // Insert as many transactions as possible while keeping order + // for (Transaction transaction : pendingTransactionQueue) { + + // // Set the transaction sequence number if it has yet to be inserted into the block chain + // if (!transaction.didSendAPartToServer()) { + // transaction.setSequenceNumber(slot.getSequenceNumber()); + // } + + // boolean ranOutOfSpace = false; + + // while (true) { + // TransactionPart part = transaction.getNextPartToSend(); + + // if (part == null) { + // // Ran out of parts to send for this transaction so move on + // break; + // } + + // if (slot.hasSpace(part)) { + // slot.addEntry(part); + // List partsSent = transactionPartsSent.get(transaction); + // if (partsSent == null) { + // partsSent = new ArrayList(); + // transactionPartsSent.put(transaction, partsSent); + // } + // partsSent.add(part.getPartNumber()); + // transactionPartsSent.put(transaction, partsSent); + + // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + // System.out.println("Inserted Into Slot: " + kv); + // } + + // ranOutOfSpace = true; + // break; + + // } else { + // ranOutOfSpace = true; + // break; + // } + // } + + // if (ranOutOfSpace) { + // break; + // } + // } + // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); @@ -1342,6 +1406,12 @@ final public class Table { // Add that part to the transaction transaction.addPartDecode(part); + + if (transaction.isComplete()) { + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Got Live Transaction " + kv + " " + part.getSequenceNumber()); + } + } } } @@ -1397,6 +1467,11 @@ final public class Table { } } + + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Arbitrating on: " + kv); + } + if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) { // Guard evaluated as true @@ -1407,7 +1482,6 @@ final public class Table { // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transaction.getSequenceNumber(); - } else { // Guard evaluated was false so create abort @@ -1424,6 +1498,10 @@ final public class Table { // Insert the abort so we can process processEntry(newAbort); + + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println("Abort From Server!!!!!! " + kv); + } } } @@ -1452,7 +1530,6 @@ final public class Table { } } - if ((newCommit != null) || (generatedAborts.size() > 0)) { ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); pendingSendArbitrationRounds.add(arbitrationRound); @@ -1484,8 +1561,6 @@ final public class Table { if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) { if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { // We've have already seen this from the server - - System.out.println("Local Arbitrate Seen Already from server, rejected"); return new Pair(false, false); } } @@ -1580,7 +1655,6 @@ final public class Table { */ private boolean compactArbitrationData() { - if (pendingSendArbitrationRounds.size() < 2) { // Nothing to compact so do nothing return false; @@ -1663,6 +1737,9 @@ final public class Table { return false; } + // private boolean compactArbitrationData() { + // return false; + // } /** * Update all the commits and the committed tables, sets dead the dead transactions @@ -1827,15 +1904,11 @@ final public class Table { - System.out.println("============"); // Update the committed table of keys and which commit is using which key for (KeyValue kv : commit.getKeyValueUpdateSet()) { - System.out.println("Committing: " + kv); committedKeyValueTable.put(kv.getKey(), kv); liveCommitsByKeyTable.put(kv.getKey(), commit); } - System.out.println("--------------"); - System.out.println(); } } @@ -2160,6 +2233,15 @@ final public class Table { */ private void processEntry(Abort entry) { + + if (entry.getTransactionSequenceNumber() != -1) { + // update the transaction status if it was sent to the server + TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); + } + } + // Abort has not been seen by the client it is for yet so we need to keep track of it Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); if (previouslySeenAbort != null) { @@ -2184,13 +2266,7 @@ final public class Table { } - if (entry.getTransactionSequenceNumber() != -1) { - // update the transaction status if it was sent to the server - TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); - if (status != null) { - status.setStatus(TransactionStatus.StatusAborted); - } - } + // Update the last arbitration data that we have seen so far if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) { @@ -2200,7 +2276,6 @@ final public class Table { // Is larger lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); } - } else { // Never seen any data from this arbitrator so record the first one lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index 9e80def..96d505c 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -11,7 +11,7 @@ import java.util.ArrayList; public class Test { - public static final int NUMBER_OF_TESTS = 100; + public static final int NUMBER_OF_TESTS = 1; public static void main(String[] args) throws ServerException { if (args[0].equals("2")) { @@ -28,9 +28,621 @@ public class Test { test7(); } else if (args[0].equals("8")) { test8(); + } else if (args[0].equals("9")) { + test9(); + } else if (args[0].equals("10")) { + test10(); + } else if (args[0].equals("11")) { + test11(); } } + + static void test11() { + + boolean foundError = false; + List transStatusList = new ArrayList(); + + // Setup the 2 clients + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, 6000); + + System.out.println("Init Table t1s"); + + while (true) { + try { + t1.initTable(); + break; + } catch (Exception e) { } + } + + + System.out.println("Update Table t2"); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, 6001); + while (t2.update() == false) {} + + // Make the Keys + System.out.println("Setting up keys"); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + + String a = "a" + i; + String b = "b" + i; + String c = "c" + i; + String d = "d" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); + IoTString ic = new IoTString(c); + IoTString id = new IoTString(d); + + while (true) { + try { + t1.createNewKey(ia, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t1.createNewKey(ib, 351); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(ic, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(id, 351); + break; + } catch (Exception e) { } + } + } + + // Do Updates for the keys + System.out.println("Setting Key-Values..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + String keyA = "a" + i; + String keyB = "b" + i; + String keyC = "c" + i; + String keyD = "d" + i; + String valueA = "a" + i; + String valueB = "b" + i; + String valueC = "c" + i; + String valueD = "d" + i; + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyB = new IoTString(keyB); + IoTString iKeyC = new IoTString(keyC); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueB = new IoTString(valueB); + IoTString iValueC = new IoTString(valueC); + IoTString iValueD = new IoTString(valueD); + + + String keyAPrev = "a" + (i - 1); + String keyBPrev = "b" + (i - 1); + String keyCPrev = "c" + (i - 1); + String keyDPrev = "d" + (i - 1); + String valueAPrev = "a" + (i - 1); + String valueBPrev = "b" + (i - 1); + String valueCPrev = "c" + (i - 1); + String valueDPrev = "d" + (i - 1); + + IoTString iKeyAPrev = new IoTString(keyAPrev); + IoTString iKeyBPrev = new IoTString(keyBPrev); + IoTString iKeyCPrev = new IoTString(keyCPrev); + IoTString iKeyDPrev = new IoTString(keyDPrev); + IoTString iValueAPrev = new IoTString(valueAPrev); + IoTString iValueBPrev = new IoTString(valueBPrev); + IoTString iValueCPrev = new IoTString(valueCPrev); + IoTString iValueDPrev = new IoTString(valueDPrev); + + t1.startTransaction(); + t1.addKV(iKeyA, iValueA); + transStatusList.add(t1.commitTransaction()); + + t1.startTransaction(); + t1.addKV(iKeyB, iValueB); + transStatusList.add(t1.commitTransaction()); + + t2.startTransaction(); + t2.addKV(iKeyC, iValueC); + transStatusList.add(t2.commitTransaction()); + + t2.startTransaction(); + t2.addKV(iKeyD, iValueD); + transStatusList.add(t2.commitTransaction()); + } + + System.out.println("Updating..."); + while (t1.update() == false) {} + while (t2.update() == false) {} + while (t1.update() == false) {} + while (t2.update() == false) {} + + System.out.println("Checking Key-Values..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + String keyA = "a" + i; + String keyB = "b" + i; + String keyC = "c" + i; + String keyD = "d" + i; + String valueA = "a" + i; + String valueB = "b" + i; + String valueC = "c" + i; + String valueD = "d" + i; + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyB = new IoTString(keyB); + IoTString iKeyC = new IoTString(keyC); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueB = new IoTString(valueB); + IoTString iValueC = new IoTString(valueC); + IoTString iValueD = new IoTString(valueD); + + + IoTString testValA1 = t1.getCommitted(iKeyA); + IoTString testValB1 = t1.getCommitted(iKeyB); + IoTString testValC1 = t1.getCommitted(iKeyC); + IoTString testValD1 = t1.getCommitted(iKeyD); + + IoTString testValA2 = t2.getCommitted(iKeyA); + IoTString testValB2 = t2.getCommitted(iKeyB); + IoTString testValC2 = t2.getCommitted(iKeyC); + IoTString testValD2 = t2.getCommitted(iKeyD); + + if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1); + foundError = true; + } + + if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1); + foundError = true; + } + + if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1); + foundError = true; + } + + if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1); + foundError = true; + } + + + if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); + foundError = true; + } + + if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); + foundError = true; + } + + if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); + foundError = true; + } + + if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); + foundError = true; + } + } + + for (TransactionStatus status : transStatusList) { + if (status.getStatus() != TransactionStatus.StatusCommitted) { + foundError = true; + } + } + + if (foundError) { + System.out.println("Found Errors..."); + } else { + System.out.println("No Errors Found..."); + } + + t1.close(); + t2.close(); + } + + static void test10() { + + boolean foundError = false; + List transStatusList = new ArrayList(); + + // Setup the 2 clients + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); + System.out.println("Init Table t1s"); + while (true) { + try { + t1.initTable(); + break; + } catch (Exception e) { } + } + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); + while (t2.update() == false) {} + + // t1.addLocalCommunication(351, "127.0.0.1", 6001); + // t2.addLocalCommunication(321, "127.0.0.1", 6000); + + + // Make the Keys + System.out.println("Setting up keys"); + for (int i = 0; i < 4; i++) { + String a = "a" + i; + String b = "b" + i; + String c = "c" + i; + String d = "d" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); + IoTString ic = new IoTString(c); + IoTString id = new IoTString(d); + while (true) { + try { + t1.createNewKey(ia, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t1.createNewKey(ib, 351); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(ic, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(id, 351); + break; + } catch (Exception e) { } + } + } + + + // Do Updates for the keys + System.out.println("B========================"); + for (int t = 0; t < NUMBER_OF_TESTS; t++) { + for (int i = 0; i < 4; i++) { + + System.out.println(i); + + String keyB = "b" + i; + String valueB = "b" + (i + t); + + IoTString iKeyB = new IoTString(keyB); + IoTString iValueB = new IoTString(valueB); + + t1.startTransaction(); + System.out.println(t1.getSpeculativeAtomic(iKeyB)); + t1.addKV(iKeyB, iValueB); + transStatusList.add(t1.commitTransaction()); + } + } + System.out.println(); + + + System.out.println("C========================"); + for (int t = 0; t < NUMBER_OF_TESTS; t++) { + for (int i = 0; i < 4; i++) { + System.out.println(i); + + String keyC = "c" + i; + String valueC = "c" + (i + t); + + IoTString iKeyC = new IoTString(keyC); + IoTString iValueC = new IoTString(valueC); + + t2.startTransaction(); + System.out.println(t2.getSpeculativeAtomic(iKeyC)); + t2.addKV(iKeyC, iValueC); + transStatusList.add(t2.commitTransaction()); + } + } + System.out.println(); + + + for (int t = 0; t < NUMBER_OF_TESTS; t++) { + for (int i = 0; i < 4; i++) { + String keyA = "a" + i; + String keyD = "d" + i; + String valueA = "a" + (i + t); + String valueD = "d" + (i + t); + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueD = new IoTString(valueD); + + + t1.startTransaction(); + t1.addKV(iKeyA, iValueA); + transStatusList.add(t1.commitTransaction()); + + + t2.startTransaction(); + t2.addKV(iKeyD, iValueD); + transStatusList.add(t2.commitTransaction()); + System.out.println(); + } + } + System.out.println(); + + System.out.println("Updating..."); + System.out.println("t1 -=-=-=-=-=-=-=-"); + while (t1.update() == false) {} + System.out.println("t2 -=-=-=-=-=-=-=-"); + while (t2.update() == false) {} + System.out.println("t1 -=-=-=-=-=-=-=-"); + while (t1.update() == false) {} + System.out.println("t2 -=-=-=-=-=-=-=-"); + while (t2.update() == false) {} + + + System.out.println("Checking Key-Values..."); + for (int i = 0; i < 4; i++) { + + String keyA = "a" + i; + String keyB = "b" + i; + String keyC = "c" + i; + String keyD = "d" + i; + String valueA = "a" + (i + NUMBER_OF_TESTS - 1); + String valueB = "b" + (i + NUMBER_OF_TESTS - 1); + String valueC = "c" + (i + NUMBER_OF_TESTS - 1); + String valueD = "d" + (i + NUMBER_OF_TESTS - 1); + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyB = new IoTString(keyB); + IoTString iKeyC = new IoTString(keyC); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueB = new IoTString(valueB); + IoTString iValueC = new IoTString(valueC); + IoTString iValueD = new IoTString(valueD); + + + IoTString testValA1 = t1.getCommitted(iKeyA); + IoTString testValB1 = t1.getCommitted(iKeyB); + IoTString testValC1 = t1.getCommitted(iKeyC); + IoTString testValD1 = t1.getCommitted(iKeyD); + + IoTString testValA2 = t2.getCommitted(iKeyA); + IoTString testValB2 = t2.getCommitted(iKeyB); + IoTString testValC2 = t2.getCommitted(iKeyC); + IoTString testValD2 = t2.getCommitted(iKeyD); + + if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1); + foundError = true; + } + + if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1); + foundError = true; + } + + if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1); + foundError = true; + } + + if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1); + foundError = true; + } + + if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); + foundError = true; + } + + if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); + foundError = true; + } + + if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); + foundError = true; + } + + if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); + foundError = true; + } + } + + int counter = 0; + for (TransactionStatus status : transStatusList) { + if (status.getStatus() != TransactionStatus.StatusCommitted) { + foundError = true; + System.out.println(counter + " Status: " + status.getStatus()); + } + counter++; + } + + if (foundError) { + System.out.println("Found Errors..."); + } else { + System.out.println("No Errors Found..."); + } + + t1.close(); + t2.close(); + } + + static void test9() { + + boolean foundError = false; + List transStatusList = new ArrayList(); + + // Setup the 2 clients + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, 6000); + + System.out.println("Init Table t1s"); + while (true) { + try { + t1.initTable(); + break; + } catch (Exception e) { } + } + + + System.out.println("Update Table t2"); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, 6001); + while (t2.update() == false) {} + + t1.addLocalCommunication(351, "127.0.0.1", 6001); + t2.addLocalCommunication(321, "127.0.0.1", 6000); + + // Make the Keys + System.out.println("Setting up keys"); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + + String a = "a" + i; + String b = "b" + i; + String c = "c" + i; + String d = "d" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); + IoTString ic = new IoTString(c); + IoTString id = new IoTString(d); + + while (true) { + try { + t1.createNewKey(ia, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t1.createNewKey(ib, 351); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(ic, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(id, 351); + break; + } catch (Exception e) { } + } + } + + System.out.println("A, D..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + String keyA = "a" + i; + String keyD = "d" + i; + String valueA = "a" + i; + String valueD = "d" + i; + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueD = new IoTString(valueD); + + t1.startTransaction(); + t1.addKV(iKeyA, iValueA); + transStatusList.add(t1.commitTransaction()); + + t2.startTransaction(); + t2.addKV(iKeyD, iValueD); + transStatusList.add(t2.commitTransaction()); + } + + while (t1.updateFromLocal(351) == false) {} + while (t2.updateFromLocal(321) == false) {} + + + System.out.println("Updating..."); + System.out.println("Checking Key-Values..."); + + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + String keyA = "a" + i; + String keyD = "d" + i; + String valueA = "a" + i; + String valueD = "d" + i; + + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueD = new IoTString(valueD); + + + IoTString testValA1 = t1.getCommitted(iKeyA); + IoTString testValD1 = t1.getCommitted(iKeyD); + IoTString testValA2 = t2.getCommitted(iKeyA); + IoTString testValD2 = t2.getCommitted(iKeyD); + + + if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1); + foundError = true; + } + + if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1); + foundError = true; + } + + + if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); + foundError = true; + } + + if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); + foundError = true; + } + } + + for (TransactionStatus status : transStatusList) { + if (status.getStatus() != TransactionStatus.StatusCommitted) { + foundError = true; + } + } + + if (foundError) { + System.out.println("Found Errors..."); + } else { + System.out.println("No Errors Found..."); + } + + t1.close(); + t2.close(); + } + static void test8() { boolean foundError = false; @@ -43,10 +655,9 @@ public class Test { while (true) { try { - System.out.println("-==-=-=-=-=-=-=-==-=-"); t1.initTable(); break; - } catch (Exception e) {} + } catch (Exception e) { } } diff --git a/version2/src/java/iotcloud/Transaction.java b/version2/src/java/iotcloud/Transaction.java index b3a0490..f444ded 100644 --- a/version2/src/java/iotcloud/Transaction.java +++ b/version2/src/java/iotcloud/Transaction.java @@ -28,6 +28,8 @@ class Transaction { private TransactionStatus transactionStatus = null; + private boolean hadServerFailure = false; + public Transaction() { parts = new HashMap(); keyValueGuardSet = new HashSet(); @@ -142,6 +144,21 @@ class Transaction { return part; } + + public void setServerFailure() { + hadServerFailure = true; + } + + public boolean getServerFailure() { + return hadServerFailure; + } + + + public void resetServerFailure() { + hadServerFailure = false; + } + + public void setTransactionStatus(TransactionStatus _transactionStatus) { transactionStatus = _transactionStatus; } @@ -268,10 +285,19 @@ class Transaction { if (kvGuard.getValue() != null) { if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) { + + + if (kv != null) { + System.out.println(kvGuard.getValue() + " " + kv.getValue()); + } else { + System.out.println(kvGuard.getValue() + " " + kv); + } + return false; } } else { if (kv != null) { + System.out.println("kvGuard was nulled: " + kv); return false; } } diff --git a/version2/src/server/iotquery.cpp b/version2/src/server/iotquery.cpp index 64639b3..1e046a3 100644 --- a/version2/src/server/iotquery.cpp +++ b/version2/src/server/iotquery.cpp @@ -208,35 +208,19 @@ void IoTQuery::getSlot() { /** * The method setSalt handles a setSalt request from the client. */ - void IoTQuery::setSalt() { /* Write the slot data we received to a SLOT file */ char *filename = getSaltFileName(); - char * response = new char[1]; - - if (access(filename, F_OK) == 0) - { - /* Already Exists */ - response[0] = 1; - } - else - { - /* Does not exist so create it */ - int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR); - doWrite(saltfd, data, length); - close(saltfd); - response[0] = 0; - } - - - sendResponse(response, 1); - + int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR); + doWrite(saltfd, data, length); + char response[0]; + sendResponse(response, 0); + close(saltfd); delete filename; - delete response; } /** - * The method getSalt handles a setSalt request from the client. + * The method getSalt handles a getSalt request from the client. */ void IoTQuery::getSalt() { @@ -247,6 +231,8 @@ void IoTQuery::getSalt() { if (stat(filename, &st) == 0) { filesize = st.st_size; } else { + char response[0]; + sendResponse(response, 0); delete filename; return; } @@ -306,6 +292,7 @@ void IoTQuery::sendResponse(char * bytes, int len) { << "Content-Length: " << len << "\r\n" << "\r\n"; cout.write(bytes, len); + cout << flush; } /** @@ -359,8 +346,10 @@ void IoTQuery::removeOldestSlot() { void IoTQuery::processQuery() { getQuery(); getDirectory(); + // readData(); if (!readData()) { + cerr << "No Data Available" << endl; return; } @@ -412,19 +401,20 @@ void IoTQuery::processQuery() { */ bool IoTQuery::readData() { - if (length) { + if (length != 0) { data = new char[length + 1]; memset(data, 0, length + 1); cin.read(data, length); } + do { char dummy; cin >> dummy; } while (!cin.eof()); - if (length) + if (length != 0) { - if (cin.fail()) + if (cin.gcount() != length) { return false; }