Fixed bug
authorAli Younis <ayounis@uci.edu>
Wed, 8 Feb 2017 00:28:54 +0000 (16:28 -0800)
committerAli Younis <ayounis@uci.edu>
Wed, 8 Feb 2017 00:28:54 +0000 (16:28 -0800)
version1/src/java/iotcloud/CloudComm.java
version2/src/java/iotcloud/Table.java
version2/src/java/iotcloud/Test.java
version2/src/java/iotcloud/Transaction.java

index ac906b145321606ba80b9246f1e051deebc10894..5a55f587e3e21f2476b71d8759ec5864dc6a3adb 100644 (file)
@@ -24,7 +24,7 @@ class CloudComm {
        static final int SALT_SIZE = 8;
        byte salt[];
        Table table;
-       
+
        /**
         * Empty Constructor needed for child class.
         */
@@ -37,8 +37,8 @@ class CloudComm {
         */
 
        CloudComm(Table _table, String _baseurl, String _password) {
-               this.table=_table;
-               this.baseurl=_baseurl;
+               this.table = _table;
+               this.baseurl = _baseurl;
                this.password = _password;
                this.random = new SecureRandom();
        }
@@ -64,13 +64,13 @@ class CloudComm {
 
        private void initCrypt() {
                try {
-                       SecretKeySpec key=initKey();
+                       SecretKeySpec key = initKey();
                        password = null; // drop password
                        mac = Mac.getInstance("HmacSHA256");
                        mac.init(key);
-                       encryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+                       encryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
                        encryptCipher.init(Cipher.ENCRYPT_MODE, key);
-                       decryptCipher =Cipher.getInstance("AES/ECB/PKCS5Padding");
+                       decryptCipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
                        decryptCipher.init(Cipher.DECRYPT_MODE, key);
                } catch (Exception e) {
                        e.printStackTrace();
@@ -83,27 +83,27 @@ class CloudComm {
         */
 
        private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
-               String reqstring=isput?"req=putslot":"req=getslot";
-               String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
+               String reqstring = isput ? "req=putslot" : "req=getslot";
+               String urlstr = baseurl + "?" + reqstring + "&seq=" + sequencenumber;
                if (maxentries != 0)
-                       urlstr += "&max="+maxentries;
+                       urlstr += "&max=" + maxentries;
                return new URL(urlstr);
        }
-       
+
        public void setSalt() {
                try {
                        salt = new byte[SALT_SIZE];
                        random.nextBytes(salt);
-                       URL url=new URL(baseurl+"?req=setsalt");
-                       URLConnection con=url.openConnection();
+                       URL url = new URL(baseurl + "?req=setsalt");
+                       URLConnection con = url.openConnection();
                        HttpURLConnection http = (HttpURLConnection) con;
                        http.setRequestMethod("POST");
                        http.setFixedLengthStreamingMode(salt.length);
                        http.setDoOutput(true);
                        http.connect();
-                       OutputStream os=http.getOutputStream();
+                       OutputStream os = http.getOutputStream();
                        os.write(salt);
-                       int responsecode=http.getResponseCode();
+                       int responsecode = http.getResponseCode();
                        if (responsecode != HttpURLConnection.HTTP_OK)
                                throw new Error("Invalid response");
                } catch (Exception e) {
@@ -114,20 +114,20 @@ class CloudComm {
        }
 
        private void getSalt() throws Exception {
-               URL url=new URL(baseurl+"?req=getsalt");
-               URLConnection con=url.openConnection();
+               URL url = new URL(baseurl + "?req=getsalt");
+               URLConnection con = url.openConnection();
                HttpURLConnection http = (HttpURLConnection) con;
                http.setRequestMethod("POST");
                http.connect();
-               
-               InputStream is=http.getInputStream();
-               DataInputStream dis=new DataInputStream(is);
-               int salt_length=dis.readInt();
-               byte [] tmp=new byte[salt_length];
+
+               InputStream is = http.getInputStream();
+               DataInputStream dis = new DataInputStream(is);
+               int salt_length = dis.readInt();
+               byte [] tmp = new byte[salt_length];
                dis.readFully(tmp);
-               salt=tmp;
+               salt = tmp;
        }
-       
+
        /*
         * API for putting a slot into the queue.  Returns null on success.
         * On failure, the server will send slots with newer sequence
@@ -140,13 +140,13 @@ class CloudComm {
                                getSalt();
                                initCrypt();
                        }
-                       
-                       long sequencenumber=slot.getSequenceNumber();
-                       byte[] bytes=slot.encode(mac);
+
+                       long sequencenumber = slot.getSequenceNumber();
+                       byte[] bytes = slot.encode(mac);
                        bytes = encryptCipher.doFinal(bytes);
 
-                       URL url=buildRequest(true, sequencenumber, max);
-                       URLConnection con=url.openConnection();
+                       URL url = buildRequest(true, sequencenumber, max);
+                       URLConnection con = url.openConnection();
                        HttpURLConnection http = (HttpURLConnection) con;
 
                        http.setRequestMethod("POST");
@@ -154,12 +154,12 @@ class CloudComm {
                        http.setDoOutput(true);
                        http.connect();
 
-                       OutputStream os=http.getOutputStream();
+                       OutputStream os = http.getOutputStream();
                        os.write(bytes);
 
-                       InputStream is=http.getInputStream();
-                       DataInputStream dis=new DataInputStream(is);
-                       byte[] resptype=new byte[7];
+                       InputStream is = http.getInputStream();
+                       DataInputStream dis = new DataInputStream(is);
+                       byte[] resptype = new byte[7];
                        dis.readFully(resptype);
                        if (Arrays.equals(resptype, "getslot".getBytes()))
                                return processSlots(dis);
@@ -184,20 +184,20 @@ class CloudComm {
                                getSalt();
                                initCrypt();
                        }
-                       
-                       URL url=buildRequest(false, sequencenumber, 0);
-                       URLConnection con=url.openConnection();
+
+                       URL url = buildRequest(false, sequencenumber, 0);
+                       URLConnection con = url.openConnection();
                        HttpURLConnection http = (HttpURLConnection) con;
                        http.setRequestMethod("POST");
                        http.connect();
-                       InputStream is=http.getInputStream();
+                       InputStream is = http.getInputStream();
+
+                       DataInputStream dis = new DataInputStream(is);
 
-                       DataInputStream dis=new DataInputStream(is);
-                       
-                       byte[] resptype=new byte[7];
+                       byte[] resptype = new byte[7];
                        dis.readFully(resptype);
                        if (!Arrays.equals(resptype, "getslot".getBytes()))
-                               throw new Error("Bad Response: "+new String(resptype));
+                               throw new Error("Bad Response: " + new String(resptype));
                        else
                                return processSlots(dis);
                } catch (Exception e) {
@@ -212,19 +212,19 @@ class CloudComm {
         */
 
        private Slot[] processSlots(DataInputStream dis) throws Exception {
-               int numberofslots=dis.readInt();
-               int[] sizesofslots=new int[numberofslots];
-               Slot[] slots=new Slot[numberofslots];
-               for(int i=0; i<numberofslots; i++)
-                       sizesofslots[i]=dis.readInt();
-
-               for(int i=0; i<numberofslots; i++) {
-                       byte[] data=new byte[sizesofslots[i]];
+               int numberofslots = dis.readInt();
+               int[] sizesofslots = new int[numberofslots];
+               Slot[] slots = new Slot[numberofslots];
+               for (int i = 0; i < numberofslots; i++)
+                       sizesofslots[i] = dis.readInt();
+
+               for (int i = 0; i < numberofslots; i++) {
+                       byte[] data = new byte[sizesofslots[i]];
                        dis.readFully(data);
 
                        data = decryptCipher.doFinal(data);
 
-                       slots[i]=Slot.decode(table, data, mac);
+                       slots[i] = Slot.decode(table, data, mac);
                }
                dis.close();
                return slots;
index f29b540941b3452e9d9668e789b4b1e35e4eb0e8..225d25fc263dfc6da74e85f33c794243355f8315 100644 (file)
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer;
  */
 
 final public class Table {
-       
+
        /* Constants */
        static final int FREE_SLOTS = 10; // Number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
@@ -56,6 +56,14 @@ final public class Table {
        private boolean didFindTableStatus = false;
        private long currMaxSize = 0;
 
+       private Slot lastSlotAttemptedToSend = null;
+       private boolean lastIsNewKey = false;
+       private int lastNewSize = 0;
+       private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
+       private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
+       private NewKey lastNewKey = null;
+
+
        /* Data Structures  */
        private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
        private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
@@ -183,7 +191,14 @@ final public class Table {
                System.out.println("Old:   " + o);
                System.out.println("New:   " + n);
                System.out.println("Size:   " + buffer.size());
-               System.out.println("Commits:   " + liveCommitsTable.size());
+               // System.out.println("Commits:   " + liveCommitsTable.size());
+               System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
+               System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
+
+               for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
+                       System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
+               }
+
 
                for (Long a : liveCommitsTable.keySet()) {
                        for (Long b : liveCommitsTable.get(a).keySet()) {
@@ -231,19 +246,19 @@ final public class Table {
                validateAndUpdate(newslots, true);
        }
 
-// public String toString() {
-//     String retString = " Committed Table: \n";
-//     retString += "---------------------------\n";
-//     retString += commitedTable.toString();
+       // public String toString() {
+       //      String retString = " Committed Table: \n";
+       //      retString += "---------------------------\n";
+       //      retString += commitedTable.toString();
 
-//     retString += "\n\n";
+       //      retString += "\n\n";
 
-//     retString += " Speculative Table: \n";
-//     retString += "---------------------------\n";
-//     retString += speculativeTable.toString();
+       //      retString += " Speculative Table: \n";
+       //      retString += "---------------------------\n";
+       //      retString += speculativeTable.toString();
 
-//     return retString;
-// }
+       //      return retString;
+       // }
 
        public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
                localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
@@ -343,6 +358,9 @@ final public class Table {
                        validateAndUpdate(newSlots, false);
                        sendToServer(null);
 
+
+                       updateLiveTransactionsAndStatus();
+
                        return true;
                } catch (Exception e) {
                        // e.printStackTrace();
@@ -474,11 +492,215 @@ final public class Table {
                bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
        }
 
+
+       boolean lastInsertedNewKey = false;
+
        private boolean sendToServer(NewKey newKey) throws ServerException {
 
+               boolean fromRetry = false;
+
+               try {
+                       if (hadPartialSendToServer) {
+                               Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+                               if (newSlots.length == 0) {
+                                       fromRetry = true;
+                                       ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+
+                                       if (sendSlotsReturn.getFirst()) {
+                                               if (newKey != null) {
+                                                       if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+                                                               newKey = null;
+                                                       }
+                                               }
+
+                                               for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                       transaction.resetServerFailure();
+
+                                                       // Update which transactions parts still need to be sent
+                                                       transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+                                                       // Add the transaction status to the outstanding list
+                                                       outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                                       // Update the transaction status
+                                                       transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                                       if (transaction.didSendAllParts()) {
+                                                               transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                                               pendingTransactionQueue.remove(transaction);
+                                                       }
+                                               }
+                                       } else {
+
+                                               newSlots = sendSlotsReturn.getThird();
+
+                                               boolean isInserted = false;
+                                               for (Slot s : newSlots) {
+                                                       if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+                                                               isInserted = true;
+                                                               break;
+                                                       }
+                                               }
+
+                                               for (Slot s : newSlots) {
+                                                       if (isInserted) {
+                                                               break;
+                                                       }
+
+                                                       // Process each entry in the slot
+                                                       for (Entry entry : s.getEntries()) {
+
+                                                               if (entry.getType() == Entry.TypeLastMessage) {
+                                                                       LastMessage lastMessage = (LastMessage)entry;
+                                                                       if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+                                                                               isInserted = true;
+                                                                               break;
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+
+                                               if (isInserted) {
+                                                       if (newKey != null) {
+                                                               if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+                                                                       newKey = null;
+                                                               }
+                                                       }
+
+                                                       for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                               transaction.resetServerFailure();
+
+                                                               // Update which transactions parts still need to be sent
+                                                               transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+                                                               // Add the transaction status to the outstanding list
+                                                               outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                                               // Update the transaction status
+                                                               transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                                               // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                                               if (transaction.didSendAllParts()) {
+                                                                       transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                                                       pendingTransactionQueue.remove(transaction);
+                                                               } else {
+                                                                       transaction.resetServerFailure();
+                                                                       // Set the transaction sequence number back to nothing
+                                                                       if (!transaction.didSendAPartToServer()) {
+                                                                               transaction.setSequenceNumber(-1);
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                               transaction.resetServerFailure();
+                                               // Set the transaction sequence number back to nothing
+                                               if (!transaction.didSendAPartToServer()) {
+                                                       transaction.setSequenceNumber(-1);
+                                               }
+                                       }
+
+                                       if (sendSlotsReturn.getThird().length != 0) {
+                                               // insert into the local block chain
+                                               validateAndUpdate(sendSlotsReturn.getThird(), true);
+                                       }
+                                       // continue;
+                               } else {
+                                       boolean isInserted = false;
+                                       for (Slot s : newSlots) {
+                                               if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+                                                       isInserted = true;
+                                                       break;
+                                               }
+                                       }
+
+                                       for (Slot s : newSlots) {
+                                               if (isInserted) {
+                                                       break;
+                                               }
+
+                                               // Process each entry in the slot
+                                               for (Entry entry : s.getEntries()) {
+
+                                                       if (entry.getType() == Entry.TypeLastMessage) {
+                                                               LastMessage lastMessage = (LastMessage)entry;
+                                                               if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+                                                                       isInserted = true;
+                                                                       break;
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       if (isInserted) {
+                                               if (newKey != null) {
+                                                       if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+                                                               newKey = null;
+                                                       }
+                                               }
+
+                                               for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                       transaction.resetServerFailure();
+
+                                                       // Update which transactions parts still need to be sent
+                                                       transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+                                                       // Add the transaction status to the outstanding list
+                                                       outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                                       // Update the transaction status
+                                                       transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                                       if (transaction.didSendAllParts()) {
+                                                               transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                                               pendingTransactionQueue.remove(transaction);
+                                                       } else {
+                                                               transaction.resetServerFailure();
+                                                               // Set the transaction sequence number back to nothing
+                                                               if (!transaction.didSendAPartToServer()) {
+                                                                       transaction.setSequenceNumber(-1);
+                                                               }
+                                                       }
+                                               }
+                                       } else {
+                                               for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                       transaction.resetServerFailure();
+                                                       // Set the transaction sequence number back to nothing
+                                                       if (!transaction.didSendAPartToServer()) {
+                                                               transaction.setSequenceNumber(-1);
+                                                       }
+                                               }
+                                       }
+
+                                       // insert into the local block chain
+                                       validateAndUpdate(newSlots, true);
+                               }
+                       }
+               } catch (ServerException e) {
+                       throw e;
+               }
+
+
                try {
                        // While we have stuff that needs inserting into the block chain
                        while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
+                               fromRetry = false;
+
+                               if (hadPartialSendToServer) {
+                                       throw new Error("Should Be error free");
+                               }
+
+
+
+                               // If there is a new key with same name then end
+                               if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
+                                       System.out.println("New Key Fail");
+                                       return false;
+                               }
 
                                // Create the slot
                                Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
@@ -495,7 +717,7 @@ final public class Table {
                                                transaction.resetNextPartToSend();
 
                                                // Set the transaction sequence number back to nothing
-                                               if (!transaction.didSendAPartToServer()) {
+                                               if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
                                                        transaction.setSequenceNumber(-1);
                                                }
                                        }
@@ -508,13 +730,22 @@ final public class Table {
                                        fillSlot(slot, true, newKey);
                                }
 
-                               // Try to send to the server
+                               lastSlotAttemptedToSend = slot;
+                               lastIsNewKey = (newKey != null);
+                               lastInsertedNewKey = insertedNewKey;
+                               lastNewSize = newSize;
+                               lastNewKey = newKey;
+                               lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+                               lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+
+
                                ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
 
-                               if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
+                               if (sendSlotsReturn.getFirst()) {
+
                                        // Did insert into the block chain
 
-                                       if (sendSlotsReturn.getFirst()) {
+                                       if (insertedNewKey) {
                                                // This slot was what was inserted not a previous slot
 
                                                // New Key was successfully inserted into the block chain so dont want to insert it again
@@ -548,16 +779,50 @@ final public class Table {
                                                if (transaction.didSendAllParts()) {
                                                        transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
                                                        pendingTransactionQueue.remove(transaction);
+
+                                                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                                               System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + slot.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
+                                                       }
                                                }
                                        }
                                } else {
+
+                                       // if (!sendSlotsReturn.getSecond()) {
+                                       //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                       //              transaction.resetServerFailure();
+                                       //      }
+                                       // } else {
+                                       //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                       //              transaction.resetServerFailure();
+
+                                       //              // Update which transactions parts still need to be sent
+                                       //              transaction.removeSentParts(transactionPartsSent.get(transaction));
+
+                                       //              // Add the transaction status to the outstanding list
+                                       //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                       //              // Update the transaction status
+                                       //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                       //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                       //              if (transaction.didSendAllParts()) {
+                                       //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                       //                      pendingTransactionQueue.remove(transaction);
+
+                                       //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                       //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
+                                       //                      }
+                                       //              }
+                                       //      }
+                                       // }
+
                                        // Reset which transaction to send
                                        for (Transaction transaction : transactionPartsSent.keySet()) {
                                                transaction.resetNextPartToSend();
-                                               transaction.resetNextPartToSend();
+                                               // transaction.resetNextPartToSend();
 
                                                // Set the transaction sequence number back to nothing
-                                               if (!transaction.didSendAPartToServer()) {
+                                               if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
                                                        transaction.setSequenceNumber(-1);
                                                }
                                        }
@@ -572,9 +837,15 @@ final public class Table {
                                        validateAndUpdate(sendSlotsReturn.getThird(), true);
                                }
                        }
+
                } catch (ServerException e) {
 
-                       // System.out.println("Server Failure:   " + e.getType());
+                       System.out.println("Server Failure:   " + e.getType());
+                       for (Transaction transaction : transactionPartsSent.keySet()) {
+                               for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                       System.out.println("Sent Error: " + kv + "    " + e.getType());
+                               }
+                       }
 
                        if (e.getType() != ServerException.TypeInputTimeout) {
                                // e.printStackTrace();
@@ -584,7 +855,7 @@ final public class Table {
                                        transaction.resetNextPartToSend();
 
                                        // Set the transaction sequence number back to nothing
-                                       if (!transaction.didSendAPartToServer()) {
+                                       if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
                                                transaction.setSequenceNumber(-1);
                                        }
                                }
@@ -592,6 +863,12 @@ final public class Table {
                                // There was a partial send to the server
                                hadPartialSendToServer = true;
 
+
+                               // if (!fromRetry) {
+                               //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+                               //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+                               // }
+
                                // Nothing was able to be sent to the server so just clear these data structures
                                for (Transaction transaction : transactionPartsSent.keySet()) {
                                        transaction.resetNextPartToSend();
@@ -665,7 +942,7 @@ final public class Table {
 
                if (localCommunicationInformation == null) {
                        // Cant talk to that device locally so do nothing
-                       return new Pair<Boolean, Boolean>(false, false);
+                       return new Pair<Boolean, Boolean>(true, false);
                }
 
                // Get the size of the send data
@@ -989,10 +1266,15 @@ final public class Table {
                        Transaction transaction = pendingTransactionQueue.get(0);
 
                        // Set the transaction sequence number if it has yet to be inserted into the block chain
-                       if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+                       // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+                       //      transaction.setSequenceNumber(slot.getSequenceNumber());
+                       // }
+
+                       if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
                                transaction.setSequenceNumber(slot.getSequenceNumber());
                        }
 
+
                        while (true) {
                                TransactionPart part = transaction.getNextPartToSend();
 
@@ -1241,8 +1523,6 @@ final public class Table {
                updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
-
-       
        private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
                if (didFindTableStatus) {
                        return;
@@ -1254,8 +1534,7 @@ final public class Table {
 
        private void updateExpectedSize() {
                expectedsize++;
-               if (expectedsize > currMaxSize)
-               {
+               if (expectedsize > currMaxSize) {
                        expectedsize = currMaxSize;
                }
        }
@@ -1274,7 +1553,7 @@ final public class Table {
        }
 
        private void updateCurrMaxSize(int newmaxsize) {
-               currMaxSize=newmaxsize;
+               currMaxSize = newmaxsize;
        }
 
 
@@ -1282,7 +1561,7 @@ final public class Table {
         * Update the size of of the local buffer if it is needed.
         */
        private void commitNewMaxSize() {
-               didFindTableStatus = false;             
+               didFindTableStatus = false;
 
                // Resize the local slot buffer
                if (numberOfSlots != currMaxSize) {
@@ -1342,6 +1621,9 @@ final public class Table {
                newTransactionParts.clear();
        }
 
+
+       private long lastSeqNumArbOn = 0;
+
        private void arbitrateFromServer() {
 
                if (liveTransactionBySequenceNumberTable.size() == 0) {
@@ -1363,11 +1645,25 @@ final public class Table {
                for (Long transactionSequenceNumber : transactionSequenceNumbers) {
                        Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
 
+                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                               System.out.println("Arb Seen: " + kv + "   " + lastSeqNumArbOn + "    " + transactionSequenceNumber + "  " + localMachineId + "   " + transaction.getArbitrator());
+                       }
+
+
                        // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
                        if (transaction.getArbitrator() != localMachineId) {
                                continue;
                        }
 
+                       if (transactionSequenceNumber < lastSeqNumArbOn) {
+                               continue;
+                       }
+
+                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                               System.out.println("Arb Seen: " + kv + "   " + lastSeqNumArbOn + "    " + transactionSequenceNumber + "  " + localMachineId);
+                       }
+
+
                        if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
                                // We have seen this already locally so dont commit again
                                continue;
@@ -1380,6 +1676,11 @@ final public class Table {
                                break;
                        }
 
+                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                               System.out.println("Arb on: " + kv + "   " + lastSeqNumArbOn + "    " + transactionSequenceNumber + "  " + localMachineId);
+                       }
+
+
                        // update the largest transaction seen by arbitrator from server
                        if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
                                lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
@@ -1399,7 +1700,9 @@ final public class Table {
                                }
 
                                // Update what the last transaction committed was for use in batch commit
-                               lastTransactionCommitted = transaction.getSequenceNumber();
+                               lastTransactionCommitted = transactionSequenceNumber;
+
+                               System.out.println("Commit Generated: " + lastTransactionCommitted + "   " + localMachineId);
                        } else {
                                // Guard evaluated was false so create abort
 
@@ -1417,6 +1720,10 @@ final public class Table {
                                // Insert the abort so we can process
                                processEntry(newAbort);
                        }
+
+                       lastSeqNumArbOn = transactionSequenceNumber;
+
+                       // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
                }
 
                Commit newCommit = null;
@@ -1739,6 +2046,30 @@ final public class Table {
                                        }
                                }
 
+                               // Update the last transaction that was updated if we can
+                               if (commit.getTransactionSequenceNumber() != -1) {
+                                       Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
+
+                                       // Update the last transaction sequence number that the arbitrator arbitrated on
+                                       if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
+                                               lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
+                                       }
+                               }
+
+
+                               for (KeyValue kv : commit.getKeyValueUpdateSet()) {
+                                       System.out.println("Commit Seen: " + kv + "   " + commit.getTransactionSequenceNumber() + "   " + localMachineId);
+                               }
+
+
+
+
+
+
+
+
+
+
                                // Update the last arbitration data that we have seen so far
                                if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
 
@@ -1804,16 +2135,6 @@ final public class Table {
                                        lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
                                }
 
-                               // Update the last transaction that was updated if we can
-                               if (commit.getTransactionSequenceNumber() != -1) {
-                                       Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
-
-                                       // Update the last transaction sequence number that the arbitrator arbitrated on
-                                       if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
-                                               lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
-                                       }
-                               }
-
                                // We processed a new commit that we havent seen before
                                didProcessANewCommit = true;
 
@@ -2240,6 +2561,21 @@ final public class Table {
         * Process new commit entries and save them for future use.  Delete duplicates
         */
        private void processEntry(CommitPart entry) {
+
+
+               // Update the last transaction that was updated if we can
+               if (entry.getTransactionSequenceNumber() != -1) {
+                       Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
+
+                       // Update the last transaction sequence number that the arbitrator arbitrated on
+                       if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
+                               lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
+                       }
+               }
+
+
+
+
                Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
 
                if (commitPart == null) {
index bfe0f871cf73ec17b4f943977e8a55b32e1afc65..99babcbe450b5501fa46e9430f4f1fd432b80f40 100644 (file)
@@ -11,7 +11,7 @@ import java.util.ArrayList;
 
 public class Test {
 
-    public static final  int NUMBER_OF_TESTS = 1000;
+    public static final  int NUMBER_OF_TESTS = 15;
 
     public static void main(String[] args)  throws ServerException {
         if (args[0].equals("2")) {
@@ -144,18 +144,23 @@ public class Test {
             IoTString iValueCPrev = new IoTString(valueCPrev);
             IoTString iValueDPrev = new IoTString(valueDPrev);
 
+
+            System.out.println("t1 A");
             t1.startTransaction();
             t1.addKV(iKeyA, iValueA);
             transStatusList.add(t1.commitTransaction());
 
+            System.out.println("t1 B");
             t1.startTransaction();
             t1.addKV(iKeyB, iValueB);
             transStatusList.add(t1.commitTransaction());
 
+            System.out.println("t2 C");
             t2.startTransaction();
             t2.addKV(iKeyC, iValueC);
             transStatusList.add(t2.commitTransaction());
 
+            System.out.println("t2 D");
             t2.startTransaction();
             t2.addKV(iKeyD, iValueD);
             transStatusList.add(t2.commitTransaction());
@@ -849,10 +854,14 @@ public class Test {
             }
         }
 
+        int count = 0;
         for (TransactionStatus status : transStatusList) {
             if (status.getStatus() != TransactionStatus.StatusCommitted) {
                 foundError = true;
+                System.out.println("Status: " + status.getStatus() + "   " + status.getTransactionSequenceNumber());
             }
+
+            count++;
         }
 
         if (foundError) {
@@ -863,6 +872,14 @@ public class Test {
 
         t1.close();
         t2.close();
+
+        System.out.println();
+        System.out.println();
+        t1.printSlots();
+
+        System.out.println();
+        System.out.println();
+        t2.printSlots();
     }
 
     static void test7() throws ServerException {
@@ -1880,40 +1897,22 @@ public class Test {
             IoTString iValueD = new IoTString(valueD);
 
 
-            System.out.println("===============================================================================");
-            System.out.println("AAAAAAAA");
-            System.out.println("===============================================================================");
             t1.startTransaction();
             t1.addKV(iKeyA, iValueA);
             transStatusList.add(t1.commitTransaction());
-            System.out.println();
 
 
-            System.out.println("===============================================================================");
-            System.out.println("BBBBBBB");
-            System.out.println("===============================================================================");
             t1.startTransaction();
             t1.addKV(iKeyB, iValueB);
             transStatusList.add(t1.commitTransaction());
-            System.out.println();
-
 
-            System.out.println("===============================================================================");
-            System.out.println("CCCCCCC");
-            System.out.println("===============================================================================");
             t2.startTransaction();
             t2.addKV(iKeyC, iValueC);
             transStatusList.add(t2.commitTransaction());
-            System.out.println();
-
 
-            System.out.println("===============================================================================");
-            System.out.println("DDDDDDDDDD");
-            System.out.println("===============================================================================");
             t2.startTransaction();
             t2.addKV(iKeyD, iValueD);
             transStatusList.add(t2.commitTransaction());
-            System.out.println();
 
         }
         endTime = System.currentTimeMillis();
@@ -2014,12 +2013,12 @@ public class Test {
             System.out.println("No Errors Found...");
         }
 
-        System.out.println();
-        System.out.println();
-        t1.printSlots();
+        // System.out.println();
+        // System.out.println();
+        // t1.printSlots();
 
-        System.out.println();
-        System.out.println();
-        t2.printSlots();
+        // System.out.println();
+        // System.out.println();
+        // t2.printSlots();
     }
 }
index 8494625a2907546414115568fea6240df9e7c146..e25d0689d62e8f2f2adf87e885f5c069b7178f91 100644 (file)
@@ -171,9 +171,11 @@ class Transaction {
 
     public void removeSentParts(List<Integer> sentParts) {
         nextPartToSend = 0;
-        partsPendingSend.removeAll(sentParts);
-        didSendAPartToServer = true;
-        transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+        if(partsPendingSend.removeAll(sentParts))
+        {
+            didSendAPartToServer = true;
+            transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+        }
     }
 
     public boolean didSendAllParts() {