Local communication working
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index 354747d723140af0797ba575cc9b441696283abc..c254939a25d6db663ff7808231a6b1a490cf8a1e 100644 (file)
@@ -228,7 +228,7 @@ final public class Table {
         * also initialize the crypto stuff.
         */
        public synchronized void initTable() throws ServerException {
-               cloud.setSalt(); //Set the salt
+               cloud.initSecurity();
 
                // Create the first insertion into the block chain which is the table status
                Slot s = new Slot(this, 1, localMachineId);
@@ -473,6 +473,8 @@ final public class Table {
                        }
                }
 
+               updateLiveStateFromLocal();
+
                return transactionStatus;
        }
 
@@ -535,11 +537,6 @@ final public class Table {
                                // Try to send to the server
                                ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
 
-                               // if (sendSlotsReturn.getSecond()) {
-                               //      System.out.println("Second was true");
-                               // }
-
-
                                if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
                                        // Did insert into the block chain
 
@@ -560,9 +557,12 @@ final public class Table {
                                                        iter.remove();
                                                }
                                        }
+                                       
+                                       for (Transaction transaction : transactionPartsSent.keySet()) {
 
 
-                                       for (Transaction transaction : transactionPartsSent.keySet()) {
+                                               transaction.resetServerFailure();
+
 
                                                // Update which transactions parts still need to be sent
                                                transaction.removeSentParts(transactionPartsSent.get(transaction));
@@ -583,6 +583,7 @@ final public class Table {
                                        // Reset which transaction to send
                                        for (Transaction transaction : transactionPartsSent.keySet()) {
                                                transaction.resetNextPartToSend();
+                                               transaction.resetNextPartToSend();
 
                                                // Set the transaction sequence number back to nothing
                                                if (!transaction.didSendAPartToServer()) {
@@ -602,11 +603,16 @@ final public class Table {
                        }
                } catch (ServerException e) {
 
+                       System.out.println("Server Failure:   " + e.getType());
+
+
                        if (e.getType() != ServerException.TypeInputTimeout) {
                                // e.printStackTrace();
 
                                // Nothing was able to be sent to the server so just clear these data structures
                                for (Transaction transaction : transactionPartsSent.keySet()) {
+                                       transaction.resetNextPartToSend();
+
                                        // Set the transaction sequence number back to nothing
                                        if (!transaction.didSendAPartToServer()) {
                                                transaction.setSequenceNumber(-1);
@@ -615,6 +621,12 @@ final public class Table {
                        } else {
                                // There was a partial send to the server
                                hadPartialSendToServer = true;
+
+                               // Nothing was able to be sent to the server so just clear these data structures
+                               for (Transaction transaction : transactionPartsSent.keySet()) {
+                                       transaction.resetNextPartToSend();
+                                       transaction.setServerFailure();
+                               }
                        }
 
                        pendingSendArbitrationEntriesToDelete.clear();
@@ -626,6 +638,56 @@ final public class Table {
                return newKey == null;
        }
 
+       public synchronized boolean updateFromLocal(long machineId) {
+               Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
+               if (localCommunicationInformation == null) {
+                       // Cant talk to that device locally so do nothing
+                       return false;
+               }
+
+               // Get the size of the send data
+               int sendDataSize = Integer.BYTES + Long.BYTES;
+
+               Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+               if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
+                       lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
+               }
+
+               byte[] sendData = new byte[sendDataSize];
+               ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+               // Encode the data
+               bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+               bbEncode.putInt(0);
+
+               // Send by local
+               byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+
+               if (returnData == null) {
+                       // Could not contact server
+                       return false;
+               }
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+               int numberOfEntries = bbDecode.getInt();
+
+               for (int i = 0; i < numberOfEntries; i++) {
+                       byte type = bbDecode.get();
+                       if (type == Entry.TypeAbort) {
+                               Abort abort = (Abort)Abort.decode(null, bbDecode);
+                               processEntry(abort);
+                       } else if (type == Entry.TypeCommitPart) {
+                               CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+                               processEntry(commitPart);
+                       }
+               }
+
+               updateLiveStateFromLocal();
+
+               return true;
+       }
+
        private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
 
                // Get the devices local communications
@@ -658,40 +720,9 @@ final public class Table {
                        part.encode(bbEncode);
                }
 
-
-
-
-
-
-
-
-
-
-
-
-               System.out.println("================================");
-               System.out.println("Sending Locally");
-               for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
-                       System.out.println(kv);
-               }
-
-
-
-
-
-
-
-
-
-
-
                // Send by local
                byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
 
-
-               System.out.println("--------------------------------");
-               System.out.println();
-
                if (returnData == null) {
                        // Could not contact server
                        return new Pair<Boolean, Boolean>(true, false);
@@ -730,10 +761,11 @@ final public class Table {
                                status.setStatus(TransactionStatus.StatusAborted);
                        }
                } else {
+                       TransactionStatus status =  transaction.getTransactionStatus();
                        if (foundAbort) {
-                               TransactionStatus status =  transaction.getTransactionStatus();
                                status.setStatus(TransactionStatus.StatusAborted);
-                               return new Pair<Boolean, Boolean>(false, false);
+                       } else {
+                               status.setStatus(TransactionStatus.StatusCommitted);
                        }
                }
 
@@ -803,12 +835,6 @@ final public class Table {
                                        continue;
                                }
 
-                               System.out.println("---");
-                               for (KeyValue kv : commit.getKeyValueUpdateSet()) {
-                                       System.out.println("Sending Commit Locally:  " + kv);
-                               }
-                               System.out.println("---");
-
                                unseenArbitrations.addAll(commit.getParts().values());
 
                                for (CommitPart commitPart : commit.getParts().values()) {
@@ -986,16 +1012,15 @@ final public class Table {
                        }
                }
 
-               // Insert as many transactions as possible while keeping order
-               for (Transaction transaction : pendingTransactionQueue) {
+               if (pendingTransactionQueue.size() > 0) {
+
+                       Transaction transaction = pendingTransactionQueue.get(0);
 
                        // Set the transaction sequence number if it has yet to be inserted into the block chain
-                       if (!transaction.didSendAPartToServer()) {
+                       if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
                                transaction.setSequenceNumber(slot.getSequenceNumber());
                        }
 
-                       boolean ranOutOfSpace = false;
-
                        while (true) {
                                TransactionPart part = transaction.getNextPartToSend();
 
@@ -1006,27 +1031,66 @@ final public class Table {
 
                                if (slot.hasSpace(part)) {
                                        slot.addEntry(part);
-
                                        List<Integer> partsSent = transactionPartsSent.get(transaction);
                                        if (partsSent == null) {
                                                partsSent = new ArrayList<Integer>();
                                                transactionPartsSent.put(transaction, partsSent);
                                        }
-
                                        partsSent.add(part.getPartNumber());
                                        transactionPartsSent.put(transaction, partsSent);
-
                                } else {
-                                       ranOutOfSpace = true;
                                        break;
                                }
                        }
-
-                       if (ranOutOfSpace) {
-                               break;
-                       }
                }
 
+
+               // // Insert as many transactions as possible while keeping order
+               // for (Transaction transaction : pendingTransactionQueue) {
+
+               //      // Set the transaction sequence number if it has yet to be inserted into the block chain
+               //      if (!transaction.didSendAPartToServer()) {
+               //              transaction.setSequenceNumber(slot.getSequenceNumber());
+               //      }
+
+               //      boolean ranOutOfSpace = false;
+
+               //      while (true) {
+               //              TransactionPart part = transaction.getNextPartToSend();
+
+               //              if (part == null) {
+               //                      // Ran out of parts to send for this transaction so move on
+               //                      break;
+               //              }
+
+               //              if (slot.hasSpace(part)) {
+               //                      slot.addEntry(part);
+               //                      List<Integer> partsSent = transactionPartsSent.get(transaction);
+               //                      if (partsSent == null) {
+               //                              partsSent = new ArrayList<Integer>();
+               //                              transactionPartsSent.put(transaction, partsSent);
+               //                      }
+               //                      partsSent.add(part.getPartNumber());
+               //                      transactionPartsSent.put(transaction, partsSent);
+
+               //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+               //                              System.out.println("Inserted Into Slot: " + kv);
+               //                      }
+
+               //                      ranOutOfSpace = true;
+               //                      break;
+
+               //              } else {
+               //                      ranOutOfSpace = true;
+               //                      break;
+               //              }
+               //      }
+
+               //      if (ranOutOfSpace) {
+               //              break;
+               //      }
+               // }
+
                // Fill the remainder of the slot with rescue data
                doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
 
@@ -1342,6 +1406,12 @@ final public class Table {
 
                                // Add that part to the transaction
                                transaction.addPartDecode(part);
+
+                               if (transaction.isComplete()) {
+                                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                               System.out.println("Got Live Transaction   " + kv + "    " + part.getSequenceNumber());
+                                       }
+                               }
                        }
                }
 
@@ -1397,6 +1467,11 @@ final public class Table {
                                }
                        }
 
+
+                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                               System.out.println("Arbitrating on:     " + kv);
+                       }
+
                        if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
                                // Guard evaluated as true
 
@@ -1407,7 +1482,6 @@ final public class Table {
 
                                // Update what the last transaction committed was for use in batch commit
                                lastTransactionCommitted = transaction.getSequenceNumber();
-
                        } else {
                                // Guard evaluated was false so create abort
 
@@ -1424,6 +1498,10 @@ final public class Table {
 
                                // Insert the abort so we can process
                                processEntry(newAbort);
+
+                               for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                       System.out.println("Abort From Server!!!!!!     " + kv);
+                               }
                        }
                }
 
@@ -1452,7 +1530,6 @@ final public class Table {
                        }
                }
 
-
                if ((newCommit != null) || (generatedAborts.size() > 0)) {
                        ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
                        pendingSendArbitrationRounds.add(arbitrationRound);
@@ -1484,8 +1561,6 @@ final public class Table {
                        if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
                                if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
                                        // We've have already seen this from the server
-
-                                       System.out.println("Local Arbitrate Seen Already from server, rejected");
                                        return new Pair<Boolean, Boolean>(false, false);
                                }
                        }
@@ -1580,7 +1655,6 @@ final public class Table {
         */
        private boolean compactArbitrationData() {
 
-
                if (pendingSendArbitrationRounds.size() < 2) {
                        // Nothing to compact so do nothing
                        return false;
@@ -1663,6 +1737,9 @@ final public class Table {
 
                return false;
        }
+       // private boolean compactArbitrationData() {
+       //      return false;
+       // }
 
        /**
         * Update all the commits and the committed tables, sets dead the dead transactions
@@ -1827,15 +1904,11 @@ final public class Table {
 
 
 
-                               System.out.println("============");
                                // Update the committed table of keys and which commit is using which key
                                for (KeyValue kv : commit.getKeyValueUpdateSet()) {
-                                       System.out.println("Committing:  " + kv);
                                        committedKeyValueTable.put(kv.getKey(), kv);
                                        liveCommitsByKeyTable.put(kv.getKey(), commit);
                                }
-                               System.out.println("--------------");
-                               System.out.println();
                        }
                }
 
@@ -2160,6 +2233,15 @@ final public class Table {
         */
        private void processEntry(Abort entry) {
 
+
+               if (entry.getTransactionSequenceNumber() != -1) {
+                       // update the transaction status if it was sent to the server
+                       TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
+                       if (status != null) {
+                               status.setStatus(TransactionStatus.StatusAborted);
+                       }
+               }
+
                // Abort has not been seen by the client it is for yet so we need to keep track of it
                Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
                if (previouslySeenAbort != null) {
@@ -2184,13 +2266,7 @@ final public class Table {
                }
 
 
-               if (entry.getTransactionSequenceNumber() != -1) {
-                       // update the transaction status if it was sent to the server
-                       TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
-                       if (status != null) {
-                               status.setStatus(TransactionStatus.StatusAborted);
-                       }
-               }
+
 
                // Update the last arbitration data that we have seen so far
                if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
@@ -2200,7 +2276,6 @@ final public class Table {
                                // Is larger
                                lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
                        }
-
                } else {
                        // Never seen any data from this arbitrator so record the first one
                        lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());